diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java index 0dab08655c..d8c3d30162 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfiguration.java @@ -373,6 +373,7 @@ public PublisherFactory defaultPublisherFactory( batchingSettings.ifAvailable(factory::setBatchingSettings); factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering()); factory.setEndpoint(gcpPubSubProperties.getPublisher().getEndpoint()); + factory.setUniverseDomain(gcpPubSubProperties.getPublisher().getUniverseDomain()); List customizers = customizersProvider.orderedStream() .collect(Collectors.toList()); diff --git a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java index 86a16d8621..f38a6d625f 100644 --- a/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java +++ b/spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java @@ -1412,6 +1412,56 @@ void flowControlSettings_multipleKeysForSameSubscription_firstOneUsed(CapturedOu }); } + @Test + void subscriberUniverseDomain_selectiveConfigurationSet() { + contextRunner + .withPropertyValues( + "spring.cloud.gcp.pubsub.subscription.subscription-name.universe-domain=example.com") + .run( + ctx -> { + GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class); + GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class); + + assertThat( + gcpPubSubProperties.computeSubscriberUniverseDomain( + "subscription-name", projectIdProvider.getProjectId())) + .isEqualTo("example.com"); + }); + } + + @Test + void subscriberUniverseDomain_globalAndSelectiveConfigurationSet_selectiveTakesPrecedence() { + contextRunner + .withPropertyValues( + "spring.cloud.gcp.pubsub.subscriber.universe-domain=example1.com", + "spring.cloud.gcp.pubsub.subscription.subscription-name.universe-domain=example2.com") + .run( + ctx -> { + GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class); + GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class); + + assertThat( + gcpPubSubProperties.computeSubscriberUniverseDomain( + "subscription-name", projectIdProvider.getProjectId())) + .isEqualTo("example2.com"); + }); + } + + @Test + void publisherUniverseDomain() { + contextRunner + .withPropertyValues("spring.cloud.gcp.pubsub.publisher.universe-domain=example.com") + .run( + ctx -> { + GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class); + CachingPublisherFactory publisherFactory = + ctx.getBean("defaultPublisherFactory", CachingPublisherFactory.class); + assertThat(gcpPubSubProperties.getPublisher().getUniverseDomain()) + .isEqualTo("example.com"); + assertThat(publisherFactory) + .hasFieldOrPropertyWithValue("delegate.universeDomain", "example.com"); + }); + } @Configuration static class CustomizerConfig { diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java index 6934eced90..4faec0f4c7 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java @@ -291,6 +291,22 @@ public String computePullEndpoint(String subscriptionName, String projectId) { return pullEndpoint != null ? pullEndpoint : this.globalSubscriber.getPullEndpoint(); } + /** + * Returns the universe domain. The subscription-specific property takes precedence if both global + * and subscription-specific properties are set. If subscription-specific configuration is not set + * then the global configuration is picked. + * + * @param subscriptionName subscription name + * @param projectId project id + * @return pull endpoint + */ + public String computeSubscriberUniverseDomain(String subscriptionName, String projectId) { + String universeDomain = + getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId)) + .getUniverseDomain(); + return universeDomain != null ? universeDomain : this.globalSubscriber.getUniverseDomain(); + } + /** * Computes the retry settings. The subscription-specific property takes precedence if both global * and subscription-specific properties are set. If subscription-specific settings are not set @@ -384,6 +400,8 @@ public static class Publisher { /** Set publisher endpoint. Example: "us-east1-pubsub.googleapis.com:443". */ private String endpoint; + private String universeDomain; + public Batching getBatching() { return this.batching; } @@ -441,6 +459,14 @@ public String getEndpoint() { public void setEndpoint(String endpoint) { this.endpoint = endpoint; } + + public String getUniverseDomain() { + return universeDomain; + } + + public void setUniverseDomain(String universeDomain) { + this.universeDomain = universeDomain; + } } /** Subscriber settings. */ @@ -487,6 +513,12 @@ public static class Subscriber { /** RPC status codes that should be retried when pulling messages. */ private Code[] retryableCodes = null; + /** + * Universe domain of the client which is part of the endpoint that is formatted as + * `${service}.${universeDomain}:${port}`. + */ + private String universeDomain; + public String getFullyQualifiedName() { return fullyQualifiedName; } @@ -571,6 +603,14 @@ public int getMaxAcknowledgementThreads() { public void setMaxAcknowledgementThreads(int maxAcknowledgementThreads) { this.maxAcknowledgementThreads = maxAcknowledgementThreads; } + + public String getUniverseDomain() { + return universeDomain; + } + + public void setUniverseDomain(String universeDomain) { + this.universeDomain = universeDomain; + } } /** Health Check settings. */ diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java index 5b5c1d62fb..7192335b58 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultPublisherFactory.java @@ -56,6 +56,8 @@ public class DefaultPublisherFactory implements PublisherFactory { private String endpoint; + private String universeDomain; + private List customizers; /** @@ -151,6 +153,10 @@ public void setEndpoint(String endpoint) { this.endpoint = endpoint; } + public void setUniverseDomain(String universeDomain) { + this.universeDomain = universeDomain; + } + /** * Accepts a list of {@link Publisher.Builder} customizers. * The customizers are applied in the order provided, so the later customizers can override @@ -222,6 +228,10 @@ void applyPublisherSettings(Publisher.Builder publisherBuilder) { if (this.endpoint != null) { publisherBuilder.setEndpoint(this.endpoint); } + + if (this.universeDomain != null) { + publisherBuilder.setUniverseDomain(this.universeDomain); + } } void applyCustomizers(Publisher.Builder publisherBuilder, String topic) { diff --git a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactory.java b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactory.java index f0f08ab6f0..1ba9347b97 100644 --- a/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactory.java +++ b/spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactory.java @@ -70,6 +70,8 @@ public class DefaultSubscriberFactory implements SubscriberFactory { private String pullEndpoint; + private String universeDomain; + private ApiClock apiClock; private RetrySettings subscriberStubRetrySettings; @@ -300,6 +302,12 @@ public Subscriber createSubscriber(String subscriptionName, MessageReceiver rece subscriberBuilder.setParallelPullCount(pullCount); } + String universeDomain = getUniverseDomain(subscriptionName); + if (universeDomain != null) { + subscriberBuilder.setUniverseDomain(universeDomain); + } + + Subscriber subscriber = subscriberBuilder.build(); if (shouldAddToHealthCheck) { @@ -557,6 +565,13 @@ public Code[] getRetryableCodes(String subscriptionName) { return this.pubSubConfiguration.computeRetryableCodes(subscriptionName, projectId); } + String getUniverseDomain(String subscriptionName) { + if (this.universeDomain != null) { + return this.universeDomain; + } + return this.pubSubConfiguration.computeSubscriberUniverseDomain(subscriptionName, projectId); + } + public void setExecutorProviderMap(Map executorProviderMap) { this.executorProviderMap = executorProviderMap; } @@ -582,6 +597,10 @@ public void setRetrySettingsMap(Map retr this.retrySettingsMap = retrySettingsMap; } + public void setUniverseDomain(String universeDomain) { + this.universeDomain = universeDomain; + } + public void setGlobalRetrySettings(RetrySettings retrySettings) { this.globalRetrySettings = retrySettings; } diff --git a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactoryTests.java b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactoryTests.java index 786c25791d..67f3279c94 100644 --- a/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactoryTests.java +++ b/spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactoryTests.java @@ -374,6 +374,9 @@ void testCreateSubscriber_validateSetProperties() { when(mockPubSubConfiguration.computePullEndpoint( "defaultSubscription", projectIdProvider.getProjectId())) .thenReturn("test.endpoint"); + when(mockPubSubConfiguration.computeSubscriberUniverseDomain( + "defaultSubscription", projectIdProvider.getProjectId())) + .thenReturn("example.com"); Subscriber expectedSubscriber = factory.createSubscriber("defaultSubscription", (message, consumer) -> {}); @@ -385,7 +388,8 @@ void testCreateSubscriber_validateSetProperties() { .hasFieldOrPropertyWithValue("minDurationPerAckExtension", Duration.ofSeconds(3L)) .hasFieldOrPropertyWithValue("maxDurationPerAckExtension", Duration.ofSeconds(4L)) .hasFieldOrPropertyWithValue("numPullers", 2) - .hasFieldOrPropertyWithValue("subStubSettings.endpoint", "test.endpoint"); + .hasFieldOrPropertyWithValue("subStubSettings.endpoint", "test.endpoint") + .hasFieldOrPropertyWithValue("subStubSettings.universeDomain", "example.com"); } @Test