diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfiguration.java index 5526f7e995a8..24e33d671c62 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfiguration.java @@ -36,6 +36,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -88,12 +89,14 @@ static class ProcessorContainerConfiguration { @ConditionalOnMissingBean EventHubsProcessorFactory defaultEventHubsNamespaceProcessorFactory( NamespaceProperties properties, + ApplicationContext applicationContext, CheckpointStore checkpointStore, ObjectProvider> suppliers, ObjectProvider tokenCredentialResolvers, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) ObjectProvider defaultTokenCredentials) { DefaultEventHubsNamespaceProcessorFactory factory = new DefaultEventHubsNamespaceProcessorFactory(checkpointStore, properties, suppliers.getIfAvailable()); + factory.setApplicationContext(applicationContext); factory.setDefaultCredential(defaultTokenCredentials.getIfAvailable()); factory.setTokenCredentialResolver(tokenCredentialResolvers.getIfAvailable()); return factory; @@ -108,10 +111,12 @@ static class EventHubsTemplateConfiguration { @ConditionalOnMissingBean EventHubsProducerFactory defaultEventHubsNamespaceProducerFactory( NamespaceProperties properties, + ApplicationContext applicationContext, ObjectProvider> suppliers, ObjectProvider tokenCredentialResolvers, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) ObjectProvider defaultTokenCredentials) { DefaultEventHubsNamespaceProducerFactory factory = new DefaultEventHubsNamespaceProducerFactory(properties, suppliers.getIfAvailable()); + factory.setApplicationContext(applicationContext); factory.setDefaultCredential(defaultTokenCredentials.getIfAvailable()); factory.setTokenCredentialResolver(tokenCredentialResolvers.getIfAvailable()); return factory; diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfiguration.java index 6a410336a85b..37de7a174347 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfiguration.java @@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -92,6 +93,7 @@ static class ProcessorContainerConfiguration { @ConditionalOnMissingBean ServiceBusProcessorFactory defaultServiceBusNamespaceProcessorFactory( NamespaceProperties properties, + ApplicationContext applicationContext, ObjectProvider> suppliers, ObjectProvider tokenCredentialResolvers, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) ObjectProvider defaultTokenCredentials, @@ -99,6 +101,7 @@ ServiceBusProcessorFactory defaultServiceBusNamespaceProcessorFactory( ObjectProvider> processorClientBuilderCustomizers, ObjectProvider> sessionProcessorClientBuilderCustomizers) { DefaultServiceBusNamespaceProcessorFactory factory = new DefaultServiceBusNamespaceProcessorFactory(properties, suppliers.getIfAvailable()); + factory.setApplicationContext(applicationContext); factory.setDefaultCredential(defaultTokenCredentials.getIfAvailable()); factory.setTokenCredentialResolver(tokenCredentialResolvers.getIfAvailable()); clientBuilderCustomizers.orderedStream().forEach(factory::addServiceBusClientBuilderCustomizer); @@ -115,12 +118,14 @@ static class ConsumerContainerConfiguration { @ConditionalOnMissingBean ServiceBusConsumerFactory defaultServiceBusNamespaceConsumerFactory( NamespaceProperties properties, + ApplicationContext applicationContext, ObjectProvider> suppliers, ObjectProvider tokenCredentialResolvers, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) ObjectProvider defaultTokenCredentials, ObjectProvider> customizers, ObjectProvider> sessionReceiverCustomizers) { DefaultServiceBusNamespaceConsumerFactory factory = new DefaultServiceBusNamespaceConsumerFactory(properties, suppliers.getIfAvailable()); + factory.setApplicationContext(applicationContext); factory.setDefaultCredential(defaultTokenCredentials.getIfAvailable()); factory.setTokenCredentialResolver(tokenCredentialResolvers.getIfAvailable()); customizers.orderedStream().forEach(factory::addServiceBusClientBuilderCustomizer); @@ -136,12 +141,14 @@ static class ServiceBusTemplateConfiguration { @ConditionalOnMissingBean ServiceBusProducerFactory defaultServiceBusNamespaceProducerFactory( NamespaceProperties properties, + ApplicationContext applicationContext, ObjectProvider> suppliers, ObjectProvider tokenCredentialResolvers, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) ObjectProvider defaultTokenCredentials, ObjectProvider> clientBuilderCustomizers, ObjectProvider> senderClientBuilderCustomizers) { DefaultServiceBusNamespaceProducerFactory factory = new DefaultServiceBusNamespaceProducerFactory(properties, suppliers.getIfAvailable()); + factory.setApplicationContext(applicationContext); factory.setDefaultCredential(defaultTokenCredentials.getIfAvailable()); factory.setTokenCredentialResolver(tokenCredentialResolvers.getIfAvailable()); clientBuilderCustomizers.orderedStream().forEach(factory::addServiceBusClientBuilderCustomizer); diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfigurationTests.java index 87daf3997d50..24dbb96eb7e0 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/AzureEventHubsMessagingAutoConfigurationTests.java @@ -5,6 +5,7 @@ import com.azure.core.credential.TokenCredential; import com.azure.messaging.eventhubs.CheckpointStore; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; import com.azure.spring.cloud.autoconfigure.implementation.context.AzureGlobalPropertiesAutoConfiguration; import com.azure.spring.cloud.autoconfigure.implementation.context.AzureTokenCredentialAutoConfiguration; import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.configuration.TestCheckpointStore; @@ -148,48 +149,43 @@ void testCustomTokenCredentialConfiguration() { AzureTokenCredentialAutoConfiguration.class, AzureGlobalPropertiesAutoConfiguration.class)) .withBean(EventHubsMessageConverter.class, EventHubsMessageConverter::new) + .withBean(CheckpointStore.class, TestCheckpointStore::new) .withPropertyValues( "spring.cloud.azure.eventhubs.connection-string=" + String.format(CONNECTION_STRING_FORMAT, "test-namespace"), "spring.cloud.azure.eventhubs.credential.token-credential-bean-name=customTokenCredential" ) .withUserConfiguration(AzureEventHubsPropertiesTestConfiguration.class) .run(context -> { - - // Verify that the properties contain the correct credential bean name + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); AzureEventHubsProperties eventHubsProperties = context.getBean(AzureEventHubsProperties.class); - assertThat(eventHubsProperties).isNotNull(); - assertThat(eventHubsProperties.getCredential()).isNotNull(); + assertThat(eventHubsProperties.getCredential().getTokenCredentialBeanName()) - .as("The token-credential-bean-name property should be set to customTokenCredential") .isEqualTo("customTokenCredential"); - // Verify that the custom token credential bean exists - assertThat(context).hasBean("customTokenCredential"); - TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); - assertThat(customCredential).isNotNull(); - - // Verify the EventHubsProducerFactory has the tokenCredentialResolver configured - assertThat(context).hasSingleBean(EventHubsProducerFactory.class); EventHubsProducerFactory producerFactory = context.getBean(EventHubsProducerFactory.class); - assertThat(producerFactory).isNotNull(); - - // Verify tokenCredentialResolver resolves to the custom credential - Field tokenCredentialResolverField = producerFactory.getClass().getDeclaredField("tokenCredentialResolver"); - tokenCredentialResolverField.setAccessible(true); - Object tokenCredentialResolver = tokenCredentialResolverField.get(producerFactory); - assertThat(tokenCredentialResolver).as("TokenCredentialResolver should be configured").isNotNull(); - - // Cast to AzureCredentialResolver and invoke resolve() to verify it returns customTokenCredential - @SuppressWarnings("unchecked") - AzureCredentialResolver resolver = - (AzureCredentialResolver) tokenCredentialResolver; - TokenCredential resolvedCredential = resolver.resolve(eventHubsProperties); - assertThat(resolvedCredential) - .as("The resolved credential should be the customTokenCredential bean") + EventHubsProcessorFactory processorFactory = context.getBean(EventHubsProcessorFactory.class); + + // Validate credential resolution - without ApplicationContext propagation fix, + // tokenCredentialBeanName would be silently ignored and connection string would be used + assertThat(resolveCredential(producerFactory, eventHubsProperties)) + .isSameAs(customCredential); + assertThat(resolveCredential(processorFactory, eventHubsProperties)) .isSameAs(customCredential); + + // Validate runtime producer creation + EventHubProducerAsyncClient producer = producerFactory.createProducer("test-eventhub"); + producer.close(); }); } + @SuppressWarnings("unchecked") + private TokenCredential resolveCredential(Object factory, AzureEventHubsProperties properties) throws Exception { + Field field = factory.getClass().getDeclaredField("tokenCredentialResolver"); + field.setAccessible(true); + AzureCredentialResolver resolver = (AzureCredentialResolver) field.get(factory); + return resolver.resolve(properties); + } + @Configuration public static class CustomTokenCredentialConfiguration { @Bean diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfigurationTests.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfigurationTests.java index 0e60e03432c1..18890bedfdda 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfigurationTests.java @@ -8,6 +8,8 @@ import com.azure.spring.cloud.autoconfigure.implementation.context.AzureTokenCredentialAutoConfiguration; import com.azure.spring.cloud.autoconfigure.implementation.servicebus.properties.AzureServiceBusProperties; import com.azure.spring.cloud.core.credential.AzureCredentialResolver; +import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType; +import com.azure.spring.messaging.servicebus.core.ServiceBusConsumerFactory; import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory; import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory; import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; @@ -150,44 +152,36 @@ void testCustomTokenCredentialConfiguration() { ) .withUserConfiguration(AzureServiceBusPropertiesTestConfiguration.class) .run(context -> { - - // Verify that the properties contain the correct credential bean name + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); AzureServiceBusProperties serviceBusProperties = context.getBean(AzureServiceBusProperties.class); - assertThat(serviceBusProperties).isNotNull(); - assertThat(serviceBusProperties.getCredential()).isNotNull(); + assertThat(serviceBusProperties.getCredential().getTokenCredentialBeanName()) - .as("The token-credential-bean-name property should be set to customTokenCredential") .isEqualTo("customTokenCredential"); - // Verify that the custom token credential bean exists - assertThat(context).hasBean("customTokenCredential"); - TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); - assertThat(customCredential).isNotNull(); - - // Verify the ServiceBusProducerFactory has the tokenCredentialResolver configured - assertThat(context).hasSingleBean(ServiceBusProducerFactory.class); ServiceBusProducerFactory producerFactory = context.getBean(ServiceBusProducerFactory.class); - assertThat(producerFactory).isNotNull(); - - // Verify tokenCredentialResolver resolves to the custom credential - Field tokenCredentialResolverField = - producerFactory.getClass().getDeclaredField("tokenCredentialResolver"); - tokenCredentialResolverField.setAccessible(true); - Object tokenCredentialResolver = tokenCredentialResolverField.get(producerFactory); - assertThat(tokenCredentialResolver) - .as("TokenCredentialResolver should be configured").isNotNull(); - - // Cast to AzureCredentialResolver and invoke resolve() to verify it returns customTokenCredential - @SuppressWarnings("unchecked") - AzureCredentialResolver resolver = - (AzureCredentialResolver) tokenCredentialResolver; - TokenCredential resolvedCredential = resolver.resolve(serviceBusProperties); - assertThat(resolvedCredential) - .as("The resolved credential should be the customTokenCredential bean") + ServiceBusProcessorFactory processorFactory = context.getBean(ServiceBusProcessorFactory.class); + ServiceBusConsumerFactory consumerFactory = context.getBean(ServiceBusConsumerFactory.class); + + assertThat(resolveCredential(producerFactory, serviceBusProperties)) .isSameAs(customCredential); + assertThat(resolveCredential(processorFactory, serviceBusProperties)) + .isSameAs(customCredential); + assertThat(resolveCredential(consumerFactory, serviceBusProperties)) + .isSameAs(customCredential); + + // Validate runtime producer creation + producerFactory.createProducer("test-queue", ServiceBusEntityType.QUEUE).close(); }); } + @SuppressWarnings("unchecked") + private TokenCredential resolveCredential(Object factory, AzureServiceBusProperties properties) throws Exception { + Field field = factory.getClass().getDeclaredField("tokenCredentialResolver"); + field.setAccessible(true); + AzureCredentialResolver resolver = (AzureCredentialResolver) field.get(factory); + return resolver.resolve(properties); + } + @Configuration public static class CustomTokenCredentialConfiguration { @Bean diff --git a/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactory.java b/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactory.java index e8177a24cdb2..060dfc07a26f 100644 --- a/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactory.java +++ b/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactory.java @@ -20,6 +20,8 @@ import com.azure.spring.cloud.service.implementation.servicebus.properties.ServiceBusClientCommonProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; import java.util.Arrays; import java.util.List; @@ -36,6 +38,7 @@ abstract class AbstractServiceBusSubClientBuilderFactory consumeProxyOptions() { return (builder, proxy) -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.proxyOptions(proxy); + getServiceBusClientBuilder().proxyOptions(proxy); } }; } @@ -108,7 +122,7 @@ protected BiConsumer consumeProxyOptions() { protected BiConsumer consumeAmqpTransportType() { return (builder, t) -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.transportType(t); + getServiceBusClientBuilder().transportType(t); } }; } @@ -117,7 +131,7 @@ protected BiConsumer consumeAmqpTransportType() { protected BiConsumer consumeAmqpRetryOptions() { return (builder, retry) -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.retryOptions(retry); + getServiceBusClientBuilder().retryOptions(retry); } }; } @@ -126,7 +140,7 @@ protected BiConsumer consumeAmqpRetryOptions() { protected BiConsumer consumeClientOptions() { return (builder, client) -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.clientOptions(client); + getServiceBusClientBuilder().clientOptions(client); } }; } @@ -141,17 +155,17 @@ protected List> getAuthenticationDescriptors(T build return Arrays.asList( new NamedKeyAuthenticationDescriptor(credential -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.credential(credential); + getServiceBusClientBuilder().credential(credential); } }), new SasAuthenticationDescriptor(credential -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.credential(credential); + getServiceBusClientBuilder().credential(credential); } }), new TokenAuthenticationDescriptor(this.tokenCredentialResolver, credential -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.credential(credential); + getServiceBusClientBuilder().credential(credential); } }) ); @@ -161,7 +175,7 @@ protected List> getAuthenticationDescriptors(T build protected BiConsumer consumeConfiguration() { return (builder, configuration) -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.configuration(configuration); + getServiceBusClientBuilder().configuration(configuration); } }; } @@ -170,7 +184,7 @@ protected BiConsumer consumeConfiguration() { protected BiConsumer consumeDefaultTokenCredential() { return (builder, credential) -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.credential(credential); + getServiceBusClientBuilder().credential(credential); } }; } @@ -179,7 +193,7 @@ protected BiConsumer consumeDefaultTokenCredential() { protected BiConsumer consumeConnectionString() { return (builder, connectionString) -> { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.connectionString(connectionString); + getServiceBusClientBuilder().connectionString(connectionString); } }; } @@ -187,11 +201,15 @@ protected BiConsumer consumeConnectionString() { @Override protected void configureService(T builder) { if (!isShareServiceBusClientBuilder()) { - this.serviceBusClientBuilder.fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); + getServiceBusClientBuilder().fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); } } protected ServiceBusClientBuilder getServiceBusClientBuilder() { + // Lazy initialization: build only when first accessed, ensuring ApplicationContext is available + if (serviceBusClientBuilder == null && serviceBusClientBuilderFactory != null) { + serviceBusClientBuilder = serviceBusClientBuilderFactory.build(); + } return serviceBusClientBuilder; } } diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactoryTests.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactoryTests.java index 2d9905cc98ce..5be7bf0f0e83 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactoryTests.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/AbstractServiceBusSubClientBuilderFactoryTests.java @@ -23,6 +23,7 @@ import static com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusClientBuilderFactoryTests.CONNECTION_STRING_FORMAT; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -145,7 +146,7 @@ private void verifyFqdnConfigured(boolean isShareServiceClientBuilder) { buildClient(builder); verify(factory.getServiceBusClientBuilder(), - times(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); + atLeast(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); } private void verifyClientSecretTokenCredentialConfigured(boolean isShareServiceClientBuilder) { @@ -206,7 +207,7 @@ private void verifyProxyPropertiesConfigured(boolean isShareServiceClientBuilder B builder = factory.build(); buildClient(builder); - verify(factory.getServiceBusClientBuilder(), times(1)).proxyOptions(any(ProxyOptions.class)); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).proxyOptions(any(ProxyOptions.class)); } private void verifyFixedRetryPropertiesConfigured(boolean isShareServiceClientBuilder) { @@ -221,7 +222,7 @@ private void verifyFixedRetryPropertiesConfigured(boolean isShareServiceClientBu B builder = factory.build(); buildClient(builder); - verify(factory.getServiceBusClientBuilder(), times(1)).retryOptions(any(AmqpRetryOptions.class)); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).retryOptions(any(AmqpRetryOptions.class)); } private void exponentialRetryPropertiesConfigured(boolean isShareServiceClientBuilder) { @@ -238,7 +239,7 @@ private void exponentialRetryPropertiesConfigured(boolean isShareServiceClientBu B builder = factory.build(); buildClient(builder); - verify(factory.getServiceBusClientBuilder(), times(1)).retryOptions(any(AmqpRetryOptions.class)); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).retryOptions(any(AmqpRetryOptions.class)); } private void verifyTransportTypeConfigured(boolean isShareServiceClientBuilder) { @@ -252,7 +253,7 @@ private void verifyTransportTypeConfigured(boolean isShareServiceClientBuilder) B builder = factory.build(); buildClient(builder); - verify(factory.getServiceBusClientBuilder(), times(1)).transportType(transportType); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).transportType(transportType); } } diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusProcessorClientBuilderFactoryTests.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusProcessorClientBuilderFactoryTests.java index 74c2447732e1..8ffec8e890b9 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusProcessorClientBuilderFactoryTests.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusProcessorClientBuilderFactoryTests.java @@ -17,6 +17,7 @@ import java.time.Duration; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -121,7 +122,7 @@ void verifyServicePropertiesConfigured(boolean isShareServiceClientBuilder) { verify(builder, times(1)).disableAutoComplete(); verify(builder, times(1)).maxConcurrentCalls(10); - verify(factory.getServiceBusClientBuilder(), times(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); } private ServiceBusProcessorClientTestProperties getServiceBusProcessorClientTestProperties(boolean isShareServiceClientBuilder) { diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusReceiverClientBuilderFactoryTests.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusReceiverClientBuilderFactoryTests.java index 028516bf94c4..0095c9cddf59 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusReceiverClientBuilderFactoryTests.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusReceiverClientBuilderFactoryTests.java @@ -13,6 +13,7 @@ import java.time.Duration; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -71,7 +72,7 @@ void verifyServicePropertiesConfigured(boolean isShareServiceClientBuilder) { verify(builder, times(1)).maxAutoLockRenewDuration(Duration.ofSeconds(5)); verify(builder, times(1)).disableAutoComplete(); - verify(factory.getServiceBusClientBuilder(), times(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); } private ServiceBusReceiverClientTestProperties getServiceBusReceiverClientTestProperties(boolean isShareServiceClientBuilder) { diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSenderClientBuilderFactoryTests.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSenderClientBuilderFactoryTests.java index 45e9d979358c..06dd1c00391e 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSenderClientBuilderFactoryTests.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSenderClientBuilderFactoryTests.java @@ -8,6 +8,7 @@ import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType; import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -65,7 +66,7 @@ void verifyServicePropertiesConfigured(boolean isShareServiceClientBuilder) { verify(factory.getServiceBusClientBuilder(), times(1)).customEndpointAddress(customEndpoint); verify(builder, times(1)).topicName("test-topic"); - verify(factory.getServiceBusClientBuilder(), times(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); } @Override diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionProcessorClientBuilderFactoryTests.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionProcessorClientBuilderFactoryTests.java index a0bcdbd2bb3f..f4ebdedb9a20 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionProcessorClientBuilderFactoryTests.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionProcessorClientBuilderFactoryTests.java @@ -19,6 +19,7 @@ import java.util.Map; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -136,7 +137,7 @@ void verifyServicePropertiesConfigured(boolean isShareServiceClientBuilder) { verify(builder, times(1)).maxConcurrentCalls(10); verify(builder, times(1)).maxConcurrentSessions(20); - verify(factory.getServiceBusClientBuilder(), times(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); } private ServiceBusProcessorClientTestProperties getServiceBusProcessorClientTestProperties(boolean isShareServiceClientBuilder) { diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionReceiverClientBuilderFactoryTests.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionReceiverClientBuilderFactoryTests.java index 7bf0532e45eb..1371833ef52a 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionReceiverClientBuilderFactoryTests.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/factory/ServiceBusSessionReceiverClientBuilderFactoryTests.java @@ -13,6 +13,7 @@ import java.time.Duration; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -72,7 +73,7 @@ void verifyServicePropertiesConfigured(boolean isShareServiceClientBuilder) { verify(builder, times(1)).maxAutoLockRenewDuration(Duration.ofSeconds(5)); verify(builder, times(1)).disableAutoComplete(); - verify(factory.getServiceBusClientBuilder(), times(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); + verify(factory.getServiceBusClientBuilder(), atLeast(1)).fullyQualifiedNamespace(properties.getFullyQualifiedNamespace()); } private ServiceBusReceiverClientTestProperties getServiceBusReceiverClientTestProperties(boolean isShareServiceClientBuilder) { diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfiguration.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfiguration.java index 5c8773fc0f53..27345f9917ed 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfiguration.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfiguration.java @@ -34,6 +34,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.Binder; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -118,21 +119,23 @@ EventHubsMessageChannelBinder eventHubBinder(EventHubsChannelProvisioner channel @Bean @ConditionalOnMissingBean EventHubsProducerFactoryCustomizer defaultEventHubsProducerFactoryCustomizer( + ApplicationContext applicationContext, AzureTokenCredentialResolver azureTokenCredentialResolver, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultAzureCredential, ObjectProvider> clientBuilderCustomizers) { - return new DefaultProducerFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver, clientBuilderCustomizers); + return new DefaultProducerFactoryCustomizer(applicationContext, defaultAzureCredential, azureTokenCredentialResolver, clientBuilderCustomizers); } @Bean @ConditionalOnMissingBean EventHubsProcessorFactoryCustomizer defaultEventHubsProcessorFactoryCustomizer( + ApplicationContext applicationContext, AzureTokenCredentialResolver azureTokenCredentialResolver, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultCredential, ObjectProvider> processorClientBuilderCustomizers) { - return new DefaultProcessorFactoryCustomizer(defaultCredential, azureTokenCredentialResolver, processorClientBuilderCustomizers); + return new DefaultProcessorFactoryCustomizer(applicationContext, defaultCredential, azureTokenCredentialResolver, processorClientBuilderCustomizers); } /** @@ -140,13 +143,16 @@ EventHubsProcessorFactoryCustomizer defaultEventHubsProcessorFactoryCustomizer( */ static class DefaultProducerFactoryCustomizer implements EventHubsProducerFactoryCustomizer { + private final ApplicationContext applicationContext; private final TokenCredential defaultCredential; private final AzureTokenCredentialResolver tokenCredentialResolver; private final ObjectProvider> clientBuilderCustomizers; - DefaultProducerFactoryCustomizer(TokenCredential defaultCredential, + DefaultProducerFactoryCustomizer(ApplicationContext applicationContext, + TokenCredential defaultCredential, AzureTokenCredentialResolver azureTokenCredentialResolver, ObjectProvider> clientBuilderCustomizers) { + this.applicationContext = applicationContext; this.defaultCredential = defaultCredential; this.tokenCredentialResolver = azureTokenCredentialResolver; this.clientBuilderCustomizers = clientBuilderCustomizers; @@ -158,6 +164,7 @@ public void customize(EventHubsProducerFactory factory) { DefaultEventHubsNamespaceProducerFactory defaultFactory = (DefaultEventHubsNamespaceProducerFactory) factory; + defaultFactory.setApplicationContext(applicationContext); defaultFactory.setDefaultCredential(defaultCredential); defaultFactory.setTokenCredentialResolver(tokenCredentialResolver); clientBuilderCustomizers.orderedStream().forEach(defaultFactory::addBuilderCustomizer); @@ -174,13 +181,16 @@ ObjectProvider> getCl */ static class DefaultProcessorFactoryCustomizer implements EventHubsProcessorFactoryCustomizer { + private final ApplicationContext applicationContext; private final TokenCredential defaultCredential; private final AzureTokenCredentialResolver tokenCredentialResolver; private final ObjectProvider> processorClientBuilderCustomizers; - DefaultProcessorFactoryCustomizer(TokenCredential defaultCredential, + DefaultProcessorFactoryCustomizer(ApplicationContext applicationContext, + TokenCredential defaultCredential, AzureTokenCredentialResolver azureTokenCredentialResolver, ObjectProvider> processorClientBuilderCustomizers) { + this.applicationContext = applicationContext; this.defaultCredential = defaultCredential; this.tokenCredentialResolver = azureTokenCredentialResolver; this.processorClientBuilderCustomizers = processorClientBuilderCustomizers; @@ -192,6 +202,7 @@ public void customize(EventHubsProcessorFactory factory) { DefaultEventHubsNamespaceProcessorFactory defaultFactory = (DefaultEventHubsNamespaceProcessorFactory) factory; + defaultFactory.setApplicationContext(applicationContext); defaultFactory.setDefaultCredential(defaultCredential); defaultFactory.setTokenCredentialResolver(tokenCredentialResolver); processorClientBuilderCustomizers.orderedStream().forEach(defaultFactory::addBuilderCustomizer); diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfigurationTests.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfigurationTests.java index cc620a3b5de6..2d801815e159 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/test/java/com/azure/spring/cloud/stream/binder/eventhubs/implementation/config/EventHubsBinderConfigurationTests.java @@ -3,10 +3,13 @@ package com.azure.spring.cloud.stream.binder.eventhubs.implementation.config; +import com.azure.core.credential.TokenCredential; import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerAsyncClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.properties.AzureEventHubsProperties; +import com.azure.spring.cloud.core.credential.AzureCredentialResolver; import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer; import com.azure.spring.cloud.resourcemanager.implementation.provisioning.EventHubsProvisioner; import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProcessorFactoryCustomizer; @@ -19,6 +22,8 @@ import com.azure.spring.cloud.stream.binder.eventhubs.core.implementation.provisioning.EventHubsChannelProvisioner; import com.azure.spring.cloud.stream.binder.eventhubs.implementation.provisioning.EventHubsChannelResourceManagerProvisioner; import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; +import com.azure.spring.messaging.eventhubs.core.DefaultEventHubsNamespaceProducerFactory; +import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties; @@ -32,6 +37,7 @@ import org.springframework.cloud.stream.binder.Binder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.test.util.ReflectionTestUtils; import java.time.Duration; import java.time.Instant; @@ -93,9 +99,6 @@ void shouldConfigureArmChannelProvisionerWhenResourceManagerProvided() { }); } - // conniey: Remove warning suppression when azure-messaging-eventhubs is updated to 5.21.0. - // https://github.com/Azure/azure-sdk-for-java/issues/46359 - @SuppressWarnings("deprecation") @Test void testExtendedBindingPropertiesShouldBind() { String producerConnectionString = String.format(CONNECTION_STRING_FORMAT, "fake-producer-namespace"); @@ -281,4 +284,92 @@ public void customize(Object builder) { } } + @Test + void testCustomTokenCredentialConfiguration() { + String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace"); + + this.contextRunner + .withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class)) + .withPropertyValues( + "spring.cloud.azure.eventhubs.connection-string=" + connectionString, + "spring.cloud.azure.eventhubs.credential.token-credential-bean-name=customTokenCredential" + ) + .run(context -> { + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); + AzureEventHubsProperties props = context.getBean(AzureEventHubsProperties.class); + EventHubsProducerFactoryCustomizer customizer = context.getBean(EventHubsProducerFactoryCustomizer.class); + + // Create producer factory and apply customizer + DefaultEventHubsNamespaceProducerFactory producerFactory = new DefaultEventHubsNamespaceProducerFactory( + context.getBean(NamespaceProperties.class)); + customizer.customize(producerFactory); + + // Validate credential resolution - without ApplicationContext propagation fix, + // tokenCredentialBeanName would be silently ignored + assertThat(resolveCredential(producerFactory, props)) + .isSameAs(customCredential); + + // Validate runtime producer creation + EventHubProducerAsyncClient producer = producerFactory.createProducer("test-eventhub"); + producer.close(); + }); + } + + @Test + void testCustomTokenCredentialConfigurationWithBinder() { + String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace"); + + this.contextRunner + .withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class)) + .withBean(CheckpointStore.class, () -> mock(CheckpointStore.class)) + .withPropertyValues( + "spring.cloud.azure.eventhubs.connection-string=" + connectionString, + "spring.cloud.azure.eventhubs.credential.token-credential-bean-name=customTokenCredential", + "spring.cloud.azure.eventhubs.namespace=test-namespace" + ) + .run(context -> { + EventHubsMessageChannelBinder binder = context.getBean(EventHubsMessageChannelBinder.class); + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); + AzureEventHubsProperties props = context.getBean(AzureEventHubsProperties.class); + + // Get producer factory from binder + EventHubsTemplate eventHubsTemplate = ReflectionTestUtils.invokeMethod(binder, "getEventHubTemplate"); + DefaultEventHubsNamespaceProducerFactory producerFactory = + (DefaultEventHubsNamespaceProducerFactory) ReflectionTestUtils.getField(eventHubsTemplate, "producerFactory"); + + // Get processor factory from binder + Object processorFactory = ReflectionTestUtils.invokeMethod(binder, "getProcessorFactory"); + + // Validate credential resolution for both factories + assertThat(resolveCredential(producerFactory, props)) + .isSameAs(customCredential); + assertThat(resolveCredential(processorFactory, props)) + .isSameAs(customCredential); + + // Validate runtime producer creation + EventHubProducerAsyncClient producer = producerFactory.createProducer("test-eventhub"); + producer.close(); + }); + } + + @SuppressWarnings("unchecked") + private TokenCredential resolveCredential(Object factory, AzureEventHubsProperties properties) { + try { + java.lang.reflect.Field field = factory.getClass().getDeclaredField("tokenCredentialResolver"); + field.setAccessible(true); + AzureCredentialResolver resolver = (AzureCredentialResolver) field.get(factory); + return resolver.resolve(properties); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Configuration + public static class CustomTokenCredentialConfiguration { + @Bean + public TokenCredential customTokenCredential() { + return mock(TokenCredential.class); + } + } + } diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java index 4e26622665ae..015d30383c1d 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java @@ -33,6 +33,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.Binder; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -119,12 +120,13 @@ ServiceBusMessageChannelBinder serviceBusBinder(ServiceBusChannelProvisioner cha @Bean @ConditionalOnMissingBean ServiceBusProducerFactoryCustomizer defaultServiceBusProducerFactoryCustomizer( + ApplicationContext applicationContext, AzureTokenCredentialResolver azureTokenCredentialResolver, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultAzureCredential, ObjectProvider> clientBuilderCustomizers, ObjectProvider> senderClientBuilderCustomizers) { - return new DefaultProducerFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver, + return new DefaultProducerFactoryCustomizer(applicationContext, defaultAzureCredential, azureTokenCredentialResolver, clientBuilderCustomizers, senderClientBuilderCustomizers); } @@ -132,13 +134,14 @@ ServiceBusProducerFactoryCustomizer defaultServiceBusProducerFactoryCustomizer( @Bean @ConditionalOnMissingBean ServiceBusProcessorFactoryCustomizer defaultServiceBusProcessorFactoryCustomizer( + ApplicationContext applicationContext, AzureTokenCredentialResolver azureTokenCredentialResolver, @Qualifier(DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME) TokenCredential defaultAzureCredential, ObjectProvider> clientBuilderCustomizers, ObjectProvider> processorClientBuilderCustomizers, ObjectProvider> sessionProcessorClientBuilderCustomizers) { - return new DefaultProcessorFactoryCustomizer(defaultAzureCredential, azureTokenCredentialResolver, + return new DefaultProcessorFactoryCustomizer(applicationContext, defaultAzureCredential, azureTokenCredentialResolver, clientBuilderCustomizers, processorClientBuilderCustomizers, sessionProcessorClientBuilderCustomizers); @@ -149,15 +152,18 @@ ServiceBusProcessorFactoryCustomizer defaultServiceBusProcessorFactoryCustomizer */ static class DefaultProducerFactoryCustomizer implements ServiceBusProducerFactoryCustomizer { + private final ApplicationContext applicationContext; private final TokenCredential defaultCredential; private final AzureTokenCredentialResolver tokenCredentialResolver; private final ObjectProvider> clientBuilderCustomizers; private final ObjectProvider> senderClientBuilderCustomizers; - DefaultProducerFactoryCustomizer(TokenCredential defaultCredential, + DefaultProducerFactoryCustomizer(ApplicationContext applicationContext, + TokenCredential defaultCredential, AzureTokenCredentialResolver azureTokenCredentialResolver, ObjectProvider> clientBuilderCustomizers, ObjectProvider> senderClientBuilderCustomizers) { + this.applicationContext = applicationContext; this.defaultCredential = defaultCredential; this.tokenCredentialResolver = azureTokenCredentialResolver; this.clientBuilderCustomizers = clientBuilderCustomizers; @@ -170,6 +176,7 @@ public void customize(ServiceBusProducerFactory factory) { DefaultServiceBusNamespaceProducerFactory defaultFactory = (DefaultServiceBusNamespaceProducerFactory) factory; + defaultFactory.setApplicationContext(applicationContext); defaultFactory.setDefaultCredential(defaultCredential); defaultFactory.setTokenCredentialResolver(tokenCredentialResolver); clientBuilderCustomizers.orderedStream().forEach(defaultFactory::addServiceBusClientBuilderCustomizer); @@ -192,17 +199,20 @@ ObjectProvider> clientBuilderCustomizers; private final ObjectProvider> processorClientBuilderCustomizers; private final ObjectProvider> sessionProcessorClientBuilderCustomizers; - DefaultProcessorFactoryCustomizer(TokenCredential defaultCredential, + DefaultProcessorFactoryCustomizer(ApplicationContext applicationContext, + TokenCredential defaultCredential, AzureTokenCredentialResolver azureTokenCredentialResolver, ObjectProvider> clientBuilderCustomizers, ObjectProvider> processorClientBuilderCustomizers, ObjectProvider> sessionProcessorClientBuilderCustomizers) { + this.applicationContext = applicationContext; this.defaultCredential = defaultCredential; this.tokenCredentialResolver = azureTokenCredentialResolver; this.clientBuilderCustomizers = clientBuilderCustomizers; @@ -213,6 +223,7 @@ static class DefaultProcessorFactoryCustomizer implements ServiceBusProcessorFac @Override public void customize(ServiceBusProcessorFactory factory) { if (factory instanceof DefaultServiceBusNamespaceProcessorFactory defaultFactory) { + defaultFactory.setApplicationContext(applicationContext); defaultFactory.setDefaultCredential(defaultCredential); defaultFactory.setTokenCredentialResolver(tokenCredentialResolver); clientBuilderCustomizers.orderedStream().forEach(defaultFactory::addServiceBusClientBuilderCustomizer); diff --git a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java index 1611bf16157a..1510974d625b 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/test/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfigurationTests.java @@ -3,26 +3,36 @@ package com.azure.spring.cloud.stream.binder.servicebus.implementation.config; +import com.azure.core.credential.TokenCredential; import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; +import com.azure.spring.cloud.autoconfigure.implementation.servicebus.properties.AzureServiceBusProperties; +import com.azure.spring.cloud.core.credential.AzureCredentialResolver; import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer; import com.azure.spring.cloud.resourcemanager.implementation.provisioning.ServiceBusProvisioner; import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType; import com.azure.spring.cloud.stream.binder.servicebus.config.ServiceBusProcessorFactoryCustomizer; import com.azure.spring.cloud.stream.binder.servicebus.config.ServiceBusProducerFactoryCustomizer; -import com.azure.spring.cloud.stream.binder.servicebus.implementation.ServiceBusMessageChannelBinder; +import com.azure.spring.cloud.stream.binder.servicebus.core.implementation.provisioning.ServiceBusChannelProvisioner; import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusConsumerProperties; import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusExtendedBindingProperties; import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusProducerProperties; -import com.azure.spring.cloud.stream.binder.servicebus.core.implementation.provisioning.ServiceBusChannelProvisioner; +import com.azure.spring.cloud.stream.binder.servicebus.implementation.ServiceBusMessageChannelBinder; import com.azure.spring.cloud.stream.binder.servicebus.implementation.provisioning.ServiceBusChannelResourceManagerProvisioner; +import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory; +import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory; +import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties; import com.azure.spring.messaging.servicebus.implementation.support.converter.ServiceBusMessageConverter; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.cloud.stream.binder.Binder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.test.util.ReflectionTestUtils; +import java.lang.reflect.Field; import java.time.Duration; import static com.azure.messaging.servicebus.models.SubQueue.DEAD_LETTER_QUEUE; @@ -255,4 +265,81 @@ public void customize(Object builder) { } } + @Test + void testCustomTokenCredentialConfiguration() { + String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace"); + + this.contextRunner + .withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class)) + .withPropertyValues( + "spring.cloud.azure.servicebus.connection-string=" + connectionString, + "spring.cloud.azure.servicebus.credential.token-credential-bean-name=customTokenCredential" + ) + .run(context -> { + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); + AzureServiceBusProperties serviceBusProperties = context.getBean(AzureServiceBusProperties.class); + + assertThat(serviceBusProperties.getCredential().getTokenCredentialBeanName()) + .isEqualTo("customTokenCredential"); + + ServiceBusProducerFactoryCustomizer customizer = context.getBean(ServiceBusProducerFactoryCustomizer.class); + DefaultServiceBusNamespaceProducerFactory producerFactory = + new DefaultServiceBusNamespaceProducerFactory(context.getBean(NamespaceProperties.class)); + customizer.customize(producerFactory); + + assertThat(resolveCredential(producerFactory, serviceBusProperties)) + .isSameAs(customCredential); + + producerFactory.createProducer("test-queue", ServiceBusEntityType.QUEUE).close(); + }); + } + + @Test + void testCustomTokenCredentialConfigurationWithBinder() { + String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace"); + + this.contextRunner + .withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class)) + .withPropertyValues( + "spring.cloud.azure.servicebus.connection-string=" + connectionString, + "spring.cloud.azure.servicebus.credential.token-credential-bean-name=customTokenCredential" + ) + .run(context -> { + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); + AzureServiceBusProperties serviceBusProperties = context.getBean(AzureServiceBusProperties.class); + ServiceBusMessageChannelBinder binder = context.getBean(ServiceBusMessageChannelBinder.class); + + Object serviceBusTemplate = ReflectionTestUtils.invokeMethod(binder, "getServiceBusTemplate"); + Field producerFactoryField = serviceBusTemplate.getClass().getDeclaredField("producerFactory"); + producerFactoryField.setAccessible(true); + Object producerFactory = producerFactoryField.get(serviceBusTemplate); + + Object processorFactory = ReflectionTestUtils.invokeMethod(binder, "getProcessorFactory"); + Assertions.assertNotNull(processorFactory); + + assertThat(resolveCredential(producerFactory, serviceBusProperties)) + .isSameAs(customCredential); + assertThat(resolveCredential(processorFactory, serviceBusProperties)) + .isSameAs(customCredential); + + ((ServiceBusProducerFactory) producerFactory).createProducer("test-queue", ServiceBusEntityType.QUEUE).close(); + }); + } + + @SuppressWarnings("unchecked") + private TokenCredential resolveCredential(Object factory, AzureServiceBusProperties properties) throws Exception { + Field field = factory.getClass().getDeclaredField("tokenCredentialResolver"); + field.setAccessible(true); + AzureCredentialResolver resolver = (AzureCredentialResolver) field.get(factory); + return resolver.resolve(properties); + } + + @Configuration + public static class CustomTokenCredentialConfiguration { + @Bean + public TokenCredential customTokenCredential() { + return mock(TokenCredential.class); + } + } + } diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProcessorFactory.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProcessorFactory.java index 6b63c9d0536b..5c9a3e4b7aba 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProcessorFactory.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProcessorFactory.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.ApplicationContext; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -55,6 +56,7 @@ public final class DefaultEventHubsNamespaceProcessorFactory implements EventHub private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubsNamespaceProcessorFactory.class); private final List listeners = new ArrayList<>(); + private ApplicationContext applicationContext; private final NamespaceProperties namespaceProperties; private final CheckpointStore checkpointStore; private final PropertiesSupplier propertiesSupplier; @@ -167,6 +169,7 @@ private EventProcessorClient doCreateProcessor(@NonNull String eventHub, @NonNul factory.setDefaultTokenCredential(this.defaultCredential); factory.setTokenCredentialResolver(this.tokenCredentialResolver); + factory.setApplicationContext(this.applicationContext); factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS); EventProcessorClientBuilder builder = factory.build(); @@ -246,4 +249,12 @@ private String getCustomizerKey(String eventHub, String consumerGroup) { return eventHub + "_" + consumerGroup; } + /** + * Set the application context. + * @param applicationContext the application context. + */ + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + } diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProducerFactory.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProducerFactory.java index bfc0635428e1..8cd6d5713b3f 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProducerFactory.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/DefaultEventHubsNamespaceProducerFactory.java @@ -15,6 +15,7 @@ import com.azure.spring.messaging.eventhubs.core.properties.ProducerProperties; import com.azure.spring.messaging.eventhubs.implementation.properties.merger.ProducerPropertiesParentMerger; import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.ApplicationContext; import org.springframework.lang.Nullable; import java.util.ArrayList; @@ -41,6 +42,7 @@ public final class DefaultEventHubsNamespaceProducerFactory implements EventHubsProducerFactory, DisposableBean { private final List listeners = new ArrayList<>(); + private ApplicationContext applicationContext; private final NamespaceProperties namespaceProperties; private final PropertiesSupplier propertiesSupplier; private final Map clients = new ConcurrentHashMap<>(); @@ -82,6 +84,7 @@ private EventHubProducerAsyncClient doCreateProducer(String eventHub, @Nullable factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS); factory.setTokenCredentialResolver(this.tokenCredentialResolver); factory.setDefaultTokenCredential(this.defaultCredential); + factory.setApplicationContext(this.applicationContext); EventHubClientBuilder builder = factory.build(); customizeBuilder(eventHub, builder); EventHubProducerAsyncClient producerClient = builder.buildAsyncProducerClient(); @@ -163,4 +166,12 @@ private void customizeBuilder(String eventHub, EventHubClientBuilder builder) { .forEach(customizer -> customizer.customize(builder)); } + /** + * Set the application context. + * @param applicationContext the application context. + */ + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + } diff --git a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceConsumerFactory.java b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceConsumerFactory.java index 1f907cae13f9..3d7802983cc1 100644 --- a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceConsumerFactory.java +++ b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceConsumerFactory.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.ApplicationContext; import org.springframework.lang.Nullable; import java.util.ArrayList; @@ -42,6 +43,7 @@ public final class DefaultServiceBusNamespaceConsumerFactory implements ServiceBusConsumerFactory, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceBusNamespaceConsumerFactory.class); private final List listeners = new ArrayList<>(); + private ApplicationContext applicationContext; private final NamespaceProperties namespaceProperties; private final PropertiesSupplier propertiesSupplier; private final Map clients = new ConcurrentHashMap<>(); @@ -122,6 +124,7 @@ private ServiceBusSessionReceiverClient doCreateReceiver(String name, @Nullable factory.setDefaultTokenCredential(this.defaultCredential); factory.setTokenCredentialResolver(this.tokenCredentialResolver); factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS); + factory.setApplicationContext(this.applicationContext); ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder builder = factory.build(); @@ -214,4 +217,11 @@ private void customizeBuilder(String entityName, ServiceBusClientBuilder.Service .forEach(customizer -> customizer.customize(builder)); } + /** + * Set the application context. + * @param applicationContext the application context. + */ + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } } diff --git a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProcessorFactory.java b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProcessorFactory.java index d0edbb3b874d..b59b5b90c960 100644 --- a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProcessorFactory.java +++ b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProcessorFactory.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.ApplicationContext; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -53,6 +54,7 @@ public final class DefaultServiceBusNamespaceProcessorFactory implements Service private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceBusNamespaceProcessorFactory.class); private final Map processorMap = new ConcurrentHashMap<>(); private final List listeners = new ArrayList<>(); + private ApplicationContext applicationContext; private final NamespaceProperties namespaceProperties; private final PropertiesSupplier propertiesSupplier; private final List> serviceBusClientBuilderCustomizers = new ArrayList<>(); @@ -180,6 +182,7 @@ private ServiceBusProcessorClient doCreateProcessor(String name, factory.setDefaultTokenCredential(this.defaultCredential); factory.setTokenCredentialResolver(this.tokenCredentialResolver); factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS); + factory.setApplicationContext(this.applicationContext); ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder builder = factory.build(); customizeBuilder(name, subscription, builder); @@ -192,6 +195,7 @@ private ServiceBusProcessorClient doCreateProcessor(String name, factory.setDefaultTokenCredential(this.defaultCredential); factory.setTokenCredentialResolver(this.tokenCredentialResolver); factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS); + factory.setApplicationContext(this.applicationContext); ServiceBusClientBuilder.ServiceBusProcessorClientBuilder builder = factory.build(); customizeBuilder(name, subscription, builder); @@ -259,7 +263,7 @@ public void addServiceBusClientBuilderCustomizer(AzureServiceClientBuilderCustom */ public void addBuilderCustomizer(AzureServiceClientBuilderCustomizer customizer) { if (customizer == null) { - LOGGER.debug("The provided '{}' customizer is null, will ignore it.", + LOGGER.debug("The provided '{}' customizer is null, will ignore it.", ServiceBusClientBuilder.ServiceBusProcessorClientBuilder.class.getName()); } else { this.customizers.add(customizer); @@ -274,7 +278,7 @@ public void addBuilderCustomizer(AzureServiceClientBuilderCustomizer customizer) { if (customizer == null) { - LOGGER.debug("The provided '{}' customizer is null, will ignore it.", + LOGGER.debug("The provided '{}' customizer is null, will ignore it.", ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder.class.getName()); } else { this.sessionCustomizers.add(customizer); @@ -342,4 +346,12 @@ private String buildProcessorName(ConsumerIdentifier k) { return k.getDestination() + "/" + (group == null ? "" : group); } + /** + * Set the application context. + * @param applicationContext the application context. + */ + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + } diff --git a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProducerFactory.java b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProducerFactory.java index deb22a075610..af5926c76caf 100644 --- a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProducerFactory.java +++ b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/DefaultServiceBusNamespaceProducerFactory.java @@ -16,6 +16,7 @@ import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties; import com.azure.spring.messaging.servicebus.implementation.properties.merger.SenderPropertiesParentMerger; import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.ApplicationContext; import org.springframework.lang.Nullable; import java.util.ArrayList; @@ -39,6 +40,7 @@ public final class DefaultServiceBusNamespaceProducerFactory implements ServiceBusProducerFactory, DisposableBean { private final List listeners = new ArrayList<>(); + private ApplicationContext applicationContext; private final NamespaceProperties namespaceProperties; private final PropertiesSupplier propertiesSupplier; private final Map clients = new ConcurrentHashMap<>(); @@ -117,6 +119,7 @@ private ServiceBusSenderAsyncClient doCreateProducer(String name, @Nullable Prod factory.setDefaultTokenCredential(this.defaultCredential); factory.setTokenCredentialResolver(this.tokenCredentialResolver); factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS); + factory.setApplicationContext(this.applicationContext); ServiceBusClientBuilder.ServiceBusSenderClientBuilder builder = factory.build(); customizeBuilder(name, builder); @@ -198,4 +201,11 @@ private void customizeBuilder(String entityName, ServiceBusClientBuilder.Service .forEach(customizer -> customizer.customize(builder)); } + /** + * Set the application context. + * @param applicationContext the application context. + */ + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } }