Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

package com.azure.spring.cloud.stream.binder.eventhubs.implementation.config;

import com.azure.core.credential.TokenCredential;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.properties.AzureEventHubsProperties;
import com.azure.spring.cloud.core.credential.AzureCredentialResolver;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.resourcemanager.implementation.provisioning.EventHubsProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProcessorFactoryCustomizer;
Expand All @@ -19,6 +21,8 @@
import com.azure.spring.cloud.stream.binder.eventhubs.core.implementation.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.implementation.provisioning.EventHubsChannelResourceManagerProvisioner;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.eventhubs.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
Expand All @@ -32,7 +36,9 @@
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.util.ReflectionTestUtils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;

Expand Down Expand Up @@ -93,9 +99,6 @@ void shouldConfigureArmChannelProvisionerWhenResourceManagerProvided() {
});
}

// conniey: Remove warning suppression when azure-messaging-eventhubs is updated to 5.21.0.
// https://github.com/Azure/azure-sdk-for-java/issues/46359
@SuppressWarnings("deprecation")
@Test
void testExtendedBindingPropertiesShouldBind() {
String producerConnectionString = String.format(CONNECTION_STRING_FORMAT, "fake-producer-namespace");
Expand Down Expand Up @@ -281,4 +284,116 @@ public void customize(Object builder) {
}
}

@Test
void testCustomTokenCredentialConfiguration() {
String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace");

this.contextRunner
.withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class))
.withPropertyValues(
"spring.cloud.azure.eventhubs.connection-string=" + connectionString,
"spring.cloud.azure.eventhubs.credential.token-credential-bean-name=customTokenCredential"
)
.run(context -> {
// Verify that the custom token credential bean exists
assertThat(context).hasBean("customTokenCredential");
TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class);
assertThat(customCredential).isNotNull();

// Verify that the properties contain the correct credential bean name
AzureEventHubsProperties eventHubsProperties = context.getBean(AzureEventHubsProperties.class);
assertThat(eventHubsProperties).isNotNull();
assertThat(eventHubsProperties.getCredential()).isNotNull();
assertThat(eventHubsProperties.getCredential().getTokenCredentialBeanName())
.as("The token-credential-bean-name property should be set to customTokenCredential")
.isEqualTo("customTokenCredential");

// Verify the EventHubsProducerFactoryCustomizer is configured and can apply credential settings
assertThat(context).hasSingleBean(EventHubsProducerFactoryCustomizer.class);
EventHubsProducerFactoryCustomizer producerFactoryCustomizer =
context.getBean(EventHubsProducerFactoryCustomizer.class);
assertThat(producerFactoryCustomizer).isNotNull();

// Verify it's the default customizer with token credential resolver
assertThat(producerFactoryCustomizer)
.isInstanceOf(EventHubsBinderConfiguration.DefaultProducerFactoryCustomizer.class);
});
}

@Test
void testCustomTokenCredentialConfigurationWithBinder() {
String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace");

this.contextRunner
.withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class))
.withBean(CheckpointStore.class, () -> mock(CheckpointStore.class))
.withPropertyValues(
"spring.cloud.azure.eventhubs.connection-string=" + connectionString,
"spring.cloud.azure.eventhubs.credential.token-credential-bean-name=customTokenCredential",
"spring.cloud.azure.eventhubs.namespace=test-namespace"
)
.run(context -> {
assertThat(context).hasSingleBean(EventHubsMessageChannelBinder.class);
EventHubsMessageChannelBinder binder = context.getBean(EventHubsMessageChannelBinder.class);

TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class);
AzureEventHubsProperties eventHubsProperties = context.getBean(AzureEventHubsProperties.class);

// Test Producer Factory
// Verify that credential resolver is properly configured in the producer factory created by binder
EventHubsTemplate eventHubsTemplate = ReflectionTestUtils.invokeMethod(binder, "getEventHubTemplate");
assertThat(eventHubsTemplate).isNotNull();

DefaultEventHubsNamespaceProducerFactory producerFactory = (DefaultEventHubsNamespaceProducerFactory) ReflectionTestUtils.getField(eventHubsTemplate, "producerFactory");
assertThat(producerFactory).isNotNull();

// Use reflection to access the tokenCredentialResolver field in producer factory
Field producerResolverField = producerFactory.getClass().getDeclaredField("tokenCredentialResolver");
producerResolverField.setAccessible(true);
Object producerResolver = producerResolverField.get(producerFactory);
Comment thread
rujche marked this conversation as resolved.
Outdated
assertThat(producerResolver)
.as("TokenCredentialResolver should be configured in the binder's producer factory")
.isNotNull();

// Verify that producer resolver can resolve the custom credential
@SuppressWarnings("unchecked")
AzureCredentialResolver<TokenCredential> typedProducerResolver =
(AzureCredentialResolver<TokenCredential>) producerResolver;
TokenCredential producerResolvedCredential = typedProducerResolver.resolve(eventHubsProperties);
assertThat(producerResolvedCredential)
.as("The resolved credential in binder's producer factory should be the customTokenCredential bean")
.isSameAs(customCredential);

// Test Processor Factory
// Get the ProcessorFactory through reflection (it's created lazily in getProcessorFactory)
Object processorFactory = ReflectionTestUtils.invokeMethod(binder, "getProcessorFactory");
assertThat(processorFactory).isNotNull();

// Use reflection to access the tokenCredentialResolver field in processor factory
Field processorResolverField = processorFactory.getClass().getDeclaredField("tokenCredentialResolver");
processorResolverField.setAccessible(true);
Object processorResolver = processorResolverField.get(processorFactory);
Comment thread
rujche marked this conversation as resolved.
Outdated
assertThat(processorResolver)
.as("TokenCredentialResolver should be configured in the binder's processor factory")
.isNotNull();

// Verify that processor resolver can resolve the custom credential
@SuppressWarnings("unchecked")
AzureCredentialResolver<TokenCredential> typedProcessorResolver =
(AzureCredentialResolver<TokenCredential>) processorResolver;
TokenCredential processorResolvedCredential = typedProcessorResolver.resolve(eventHubsProperties);
assertThat(processorResolvedCredential)
.as("The resolved credential in binder's processor factory should be the customTokenCredential bean")
.isSameAs(customCredential);
});
}

@Configuration
public static class CustomTokenCredentialConfiguration {
@Bean
public TokenCredential customTokenCredential() {
return mock(TokenCredential.class);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

package com.azure.spring.cloud.stream.binder.servicebus.implementation.config;

import com.azure.core.credential.TokenCredential;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.spring.cloud.autoconfigure.implementation.servicebus.properties.AzureServiceBusProperties;
import com.azure.spring.cloud.core.credential.AzureCredentialResolver;
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
import com.azure.spring.cloud.resourcemanager.implementation.provisioning.ServiceBusProvisioner;
import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType;
Expand All @@ -16,13 +19,17 @@
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusProducerProperties;
import com.azure.spring.cloud.stream.binder.servicebus.core.implementation.provisioning.ServiceBusChannelProvisioner;
import com.azure.spring.cloud.stream.binder.servicebus.implementation.provisioning.ServiceBusChannelResourceManagerProvisioner;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory;
import com.azure.spring.messaging.servicebus.implementation.support.converter.ServiceBusMessageConverter;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.util.ReflectionTestUtils;

import java.lang.reflect.Field;
import java.time.Duration;

import static com.azure.messaging.servicebus.models.SubQueue.DEAD_LETTER_QUEUE;
Expand Down Expand Up @@ -255,4 +262,124 @@ public void customize(Object builder) {
}
}

@Test
void testCustomTokenCredentialConfiguration() {
String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace");

this.contextRunner
.withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class))
.withPropertyValues(
"spring.cloud.azure.servicebus.connection-string=" + connectionString,
"spring.cloud.azure.servicebus.credential.token-credential-bean-name=customTokenCredential"
)
.run(context -> {
// Verify that the custom token credential bean exists
assertThat(context).hasBean("customTokenCredential");
TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class);
assertThat(customCredential).isNotNull();

// Verify that the properties contain the correct credential bean name
AzureServiceBusProperties serviceBusProperties = context.getBean(AzureServiceBusProperties.class);
assertThat(serviceBusProperties).isNotNull();
assertThat(serviceBusProperties.getCredential()).isNotNull();
assertThat(serviceBusProperties.getCredential().getTokenCredentialBeanName())
.as("The token-credential-bean-name property should be set to customTokenCredential")
.isEqualTo("customTokenCredential");

// Verify the ServiceBusProducerFactoryCustomizer is configured and can apply credential settings
assertThat(context).hasSingleBean(ServiceBusProducerFactoryCustomizer.class);
ServiceBusProducerFactoryCustomizer producerFactoryCustomizer =
context.getBean(ServiceBusProducerFactoryCustomizer.class);
assertThat(producerFactoryCustomizer).isNotNull();

// Verify it's the default customizer with token credential resolver
assertThat(producerFactoryCustomizer)
.isInstanceOf(ServiceBusBinderConfiguration.DefaultProducerFactoryCustomizer.class);
});
}

@Test
void testCustomTokenCredentialConfigurationWithBinder() {
String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace");

this.contextRunner
.withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class))
.withPropertyValues(
"spring.cloud.azure.servicebus.connection-string=" + connectionString,
"spring.cloud.azure.servicebus.credential.token-credential-bean-name=customTokenCredential"
)
.run(context -> {
// Verify that the custom token credential bean exists
assertThat(context).hasBean("customTokenCredential");
TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class);
assertThat(customCredential).isNotNull();

// Verify that the binder is created
assertThat(context).hasSingleBean(ServiceBusMessageChannelBinder.class);
ServiceBusMessageChannelBinder binder = context.getBean(ServiceBusMessageChannelBinder.class);
assertThat(binder).isNotNull();

// Test Producer Factory
// Get the ServiceBusTemplate through reflection (it's created lazily in getServiceBusTemplate)
Object serviceBusTemplate = ReflectionTestUtils.invokeMethod(binder, "getServiceBusTemplate");
assertThat(serviceBusTemplate).isNotNull();

// Get the producer factory from the template
Field producerFactoryField = serviceBusTemplate.getClass().getDeclaredField("producerFactory");
producerFactoryField.setAccessible(true);
Object producerFactory = producerFactoryField.get(serviceBusTemplate);
Comment thread
rujche marked this conversation as resolved.
assertThat(producerFactory).isInstanceOf(DefaultServiceBusNamespaceProducerFactory.class);

// Verify tokenCredentialResolver is configured in the producer factory created by binder
Field producerTokenCredentialResolverField =
producerFactory.getClass().getDeclaredField("tokenCredentialResolver");
producerTokenCredentialResolverField.setAccessible(true);
Object producerTokenCredentialResolver = producerTokenCredentialResolverField.get(producerFactory);
Comment thread
rujche marked this conversation as resolved.
Outdated
assertThat(producerTokenCredentialResolver)
.as("TokenCredentialResolver should be configured in the binder's producer factory")
.isNotNull();

// Verify it resolves to the custom credential
AzureServiceBusProperties serviceBusProperties = context.getBean(AzureServiceBusProperties.class);
@SuppressWarnings("unchecked")
AzureCredentialResolver<TokenCredential> producerResolver =
(AzureCredentialResolver<TokenCredential>) producerTokenCredentialResolver;
TokenCredential producerResolvedCredential = producerResolver.resolve(serviceBusProperties);
assertThat(producerResolvedCredential)
.as("The resolved credential in binder's producer factory should be the customTokenCredential bean")
.isSameAs(customCredential);

// Test Processor Factory
// Get the ProcessorFactory through reflection (it's created lazily in getProcessorFactory)
Object processorFactory = ReflectionTestUtils.invokeMethod(binder, "getProcessorFactory");
assertThat(processorFactory).isNotNull();

// Verify tokenCredentialResolver is configured in the processor factory created by binder
Field processorTokenCredentialResolverField =
processorFactory.getClass().getDeclaredField("tokenCredentialResolver");
processorTokenCredentialResolverField.setAccessible(true);
Object processorTokenCredentialResolver = processorTokenCredentialResolverField.get(processorFactory);
Comment thread
rujche marked this conversation as resolved.
Outdated
assertThat(processorTokenCredentialResolver)
.as("TokenCredentialResolver should be configured in the binder's processor factory")
.isNotNull();

// Verify it resolves to the custom credential
@SuppressWarnings("unchecked")
AzureCredentialResolver<TokenCredential> processorResolver =
(AzureCredentialResolver<TokenCredential>) processorTokenCredentialResolver;
TokenCredential processorResolvedCredential = processorResolver.resolve(serviceBusProperties);
assertThat(processorResolvedCredential)
.as("The resolved credential in binder's processor factory should be the customTokenCredential bean")
.isSameAs(customCredential);
});
}

@Configuration
public static class CustomTokenCredentialConfiguration {
@Bean
public TokenCredential customTokenCredential() {
return mock(TokenCredential.class);
}
}

}
Loading