Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add property to customize universe domain in Pub/Sub #3348

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ public PublisherFactory defaultPublisherFactory(
batchingSettings.ifAvailable(factory::setBatchingSettings);
factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering());
factory.setEndpoint(gcpPubSubProperties.getPublisher().getEndpoint());
factory.setUniverseDomain(gcpPubSubProperties.getPublisher().getUniverseDomain());

List<PublisherCustomizer> customizers = customizersProvider.orderedStream()
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,56 @@ void flowControlSettings_multipleKeysForSameSubscription_firstOneUsed(CapturedOu
});
}

@Test
void subscriberUniverseDomain_selectiveConfigurationSet() {
contextRunner
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscription.subscription-name.universe-domain=example.com")
.run(
ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);

assertThat(
gcpPubSubProperties.computeSubscriberUniverseDomain(
"subscription-name", projectIdProvider.getProjectId()))
.isEqualTo("example.com");
});
}

@Test
void subscriberUniverseDomain_globalAndSelectiveConfigurationSet_selectiveTakesPrecedence() {
contextRunner
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.universe-domain=example1.com",
"spring.cloud.gcp.pubsub.subscription.subscription-name.universe-domain=example2.com")
.run(
ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);

assertThat(
gcpPubSubProperties.computeSubscriberUniverseDomain(
"subscription-name", projectIdProvider.getProjectId()))
.isEqualTo("example2.com");
});
}

@Test
void publisherUniverseDomain() {
contextRunner
.withPropertyValues("spring.cloud.gcp.pubsub.publisher.universe-domain=example.com")
.run(
ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class);
CachingPublisherFactory publisherFactory =
ctx.getBean("defaultPublisherFactory", CachingPublisherFactory.class);
assertThat(gcpPubSubProperties.getPublisher().getUniverseDomain())
.isEqualTo("example.com");
assertThat(publisherFactory)
.hasFieldOrPropertyWithValue("delegate.universeDomain", "example.com");
});
}

@Configuration
static class CustomizerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ public String computePullEndpoint(String subscriptionName, String projectId) {
return pullEndpoint != null ? pullEndpoint : this.globalSubscriber.getPullEndpoint();
}

/**
* Returns the universe domain. The subscription-specific property takes precedence if both global
* and subscription-specific properties are set. If subscription-specific configuration is not set
* then the global configuration is picked.
*
* @param subscriptionName subscription name
* @param projectId project id
* @return pull endpoint
*/
public String computeSubscriberUniverseDomain(String subscriptionName, String projectId) {
String universeDomain =
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getUniverseDomain();
return universeDomain != null ? universeDomain : this.globalSubscriber.getUniverseDomain();
}

/**
* Computes the retry settings. The subscription-specific property takes precedence if both global
* and subscription-specific properties are set. If subscription-specific settings are not set
Expand Down Expand Up @@ -384,6 +400,8 @@ public static class Publisher {
/** Set publisher endpoint. Example: "us-east1-pubsub.googleapis.com:443". */
private String endpoint;

private String universeDomain;

public Batching getBatching() {
return this.batching;
}
Expand Down Expand Up @@ -441,6 +459,14 @@ public String getEndpoint() {
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public String getUniverseDomain() {
return universeDomain;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}
}

/** Subscriber settings. */
Expand Down Expand Up @@ -487,6 +513,12 @@ public static class Subscriber {
/** RPC status codes that should be retried when pulling messages. */
private Code[] retryableCodes = null;

/**
* Universe domain of the client which is part of the endpoint that is formatted as
* `${service}.${universeDomain}:${port}`.
*/
private String universeDomain;

public String getFullyQualifiedName() {
return fullyQualifiedName;
}
Expand Down Expand Up @@ -571,6 +603,14 @@ public int getMaxAcknowledgementThreads() {
public void setMaxAcknowledgementThreads(int maxAcknowledgementThreads) {
this.maxAcknowledgementThreads = maxAcknowledgementThreads;
}

public String getUniverseDomain() {
return universeDomain;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}
}

/** Health Check settings. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class DefaultPublisherFactory implements PublisherFactory {

private String endpoint;

private String universeDomain;

private List<PublisherCustomizer> customizers;

/**
Expand Down Expand Up @@ -151,6 +153,10 @@ public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}

/**
* Accepts a list of {@link Publisher.Builder} customizers.
* The customizers are applied in the order provided, so the later customizers can override
Expand Down Expand Up @@ -222,6 +228,10 @@ void applyPublisherSettings(Publisher.Builder publisherBuilder) {
if (this.endpoint != null) {
publisherBuilder.setEndpoint(this.endpoint);
}

if (this.universeDomain != null) {
publisherBuilder.setUniverseDomain(this.universeDomain);
}
}

void applyCustomizers(Publisher.Builder publisherBuilder, String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class DefaultSubscriberFactory implements SubscriberFactory {

private String pullEndpoint;

private String universeDomain;

private ApiClock apiClock;

private RetrySettings subscriberStubRetrySettings;
Expand Down Expand Up @@ -300,6 +302,12 @@ public Subscriber createSubscriber(String subscriptionName, MessageReceiver rece
subscriberBuilder.setParallelPullCount(pullCount);
}

String universeDomain = getUniverseDomain(subscriptionName);
if (universeDomain != null) {
subscriberBuilder.setUniverseDomain(universeDomain);
}


Subscriber subscriber = subscriberBuilder.build();

if (shouldAddToHealthCheck) {
Expand Down Expand Up @@ -557,6 +565,13 @@ public Code[] getRetryableCodes(String subscriptionName) {
return this.pubSubConfiguration.computeRetryableCodes(subscriptionName, projectId);
}

String getUniverseDomain(String subscriptionName) {
if (this.universeDomain != null) {
return this.universeDomain;
}
return this.pubSubConfiguration.computeSubscriberUniverseDomain(subscriptionName, projectId);
}

public void setExecutorProviderMap(Map<ProjectSubscriptionName, ExecutorProvider> executorProviderMap) {
this.executorProviderMap = executorProviderMap;
}
Expand All @@ -582,6 +597,10 @@ public void setRetrySettingsMap(Map<ProjectSubscriptionName, RetrySettings> retr
this.retrySettingsMap = retrySettingsMap;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}

public void setGlobalRetrySettings(RetrySettings retrySettings) {
this.globalRetrySettings = retrySettings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ void testCreateSubscriber_validateSetProperties() {
when(mockPubSubConfiguration.computePullEndpoint(
"defaultSubscription", projectIdProvider.getProjectId()))
.thenReturn("test.endpoint");
when(mockPubSubConfiguration.computeSubscriberUniverseDomain(
"defaultSubscription", projectIdProvider.getProjectId()))
.thenReturn("example.com");

Subscriber expectedSubscriber =
factory.createSubscriber("defaultSubscription", (message, consumer) -> {});
Expand All @@ -385,7 +388,8 @@ void testCreateSubscriber_validateSetProperties() {
.hasFieldOrPropertyWithValue("minDurationPerAckExtension", Duration.ofSeconds(3L))
.hasFieldOrPropertyWithValue("maxDurationPerAckExtension", Duration.ofSeconds(4L))
.hasFieldOrPropertyWithValue("numPullers", 2)
.hasFieldOrPropertyWithValue("subStubSettings.endpoint", "test.endpoint");
.hasFieldOrPropertyWithValue("subStubSettings.endpoint", "test.endpoint")
.hasFieldOrPropertyWithValue("subStubSettings.universeDomain", "example.com");
}

@Test
Expand Down
Loading