Skip to content

Commit

Permalink
Adds support for custom endpoints (#18934)
Browse files Browse the repository at this point in the history
* Exposing customEndpointAddress in Event HubClientBuilder

* Add CHANGELOG entry.

* Remove verifyZeroInteractions

* Adding sample.
  • Loading branch information
conniey authored Feb 5, 2021
1 parent e34b2cb commit 03ce19a
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 23 deletions.
2 changes: 2 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 5.5.0-beta.1 (Unreleased)
- Use `BinaryData` in `EventData`.
- Expose `EventHubsClientBuilder.customEndpointAddress` to support connecting to an intermediary before Azure Event
Hubs.

## 5.4.0 (2021-01-14)
### New features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import reactor.core.scheduler.Schedulers;

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URL;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -144,6 +146,7 @@ public class EventHubClientBuilder {
private Integer prefetchCount;
private ClientOptions clientOptions;
private SslDomain.VerifyMode verifyMode;
private URL customEndpointAddress;

/**
* Keeps track of the open clients that were created from this builder when there is a shared connection.
Expand Down Expand Up @@ -266,6 +269,33 @@ public EventHubClientBuilder configuration(Configuration configuration) {
this.configuration = configuration;
return this;
}
/**
* Sets a custom endpoint address when connecting to the Event Hubs service. This can be useful when your network
* does not allow connecting to the standard Azure Event Hubs endpoint address, but does allow connecting through
* an intermediary. For example: {@literal https://my.custom.endpoint.com:55300}.
* <p>
* If no port is specified, the default port for the {@link #transportType(AmqpTransportType) transport type} is
* used.
*
* @param customEndpointAddress The custom endpoint address.
* @return The updated {@link EventHubClientBuilder} object.
* @throws IllegalArgumentException if {@code customEndpointAddress} cannot be parsed into a valid {@link URL}.
*/
public EventHubClientBuilder customEndpointAddress(String customEndpointAddress) {
if (customEndpointAddress == null) {
this.customEndpointAddress = null;
return this;
}

try {
this.customEndpointAddress = new URL(customEndpointAddress);
} catch (MalformedURLException e) {
throw logger.logExceptionAsError(
new IllegalArgumentException(customEndpointAddress + " : is not a valid URL.", e));
}

return this;
}

/**
* Toggles the builder to use the same connection for producers or consumers that are built from this instance. By
Expand Down Expand Up @@ -662,8 +692,14 @@ private ConnectionOptions getConnectionOptions() {
? verifyMode
: SslDomain.VerifyMode.VERIFY_PEER_NAME;

return new ConnectionOptions(fullyQualifiedNamespace, credentials, authorizationType, transport, retryOptions,
proxyOptions, scheduler, options, verificationMode);
if (customEndpointAddress == null) {
return new ConnectionOptions(fullyQualifiedNamespace, credentials, authorizationType, transport,
retryOptions, proxyOptions, scheduler, options, verificationMode);
} else {
return new ConnectionOptions(fullyQualifiedNamespace, credentials, authorizationType, transport,
retryOptions, proxyOptions, scheduler, options, verificationMode, customEndpointAddress.getHost(),
customEndpointAddress.getPort());
}
}

private ProxyOptions getDefaultProxyConfiguration(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.eventhubs.models.SendOptions;

/**
* Sample demonstrates how to use an intermediary service to connect to Azure Event Hubs. In this demo, an application
* gateway.
*/
public class PublishEventsCustomEndpoint {
/**
* Main method to invoke this demo about how to use an intermediary service to connect to Azure Event Hubs.
*
* @param args Unused arguments to the program.
*/
public static void main(String[] args) {
// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 3. Creating a "Shared access policy" for your Event Hubs namespace.
// 4. Copying the connection string from the policy's properties.
// (The default policy name is "RootManageSharedAccessKey".)
String connectionString = "Endpoint={endpoint};SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={sharedAccessKey}";
String eventHubName = "<< my-event-hub-name >>";

// The address of our intermediary service.
String customEndpoint = "<< https://my-application-gateway.cloudapp.azure.com >>";

// Instantiate a client that will be used to call the service.
// We are using WEB_SOCKETS.
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.customEndpointAddress(customEndpoint)
.buildProducerClient();

// Querying the partition identifiers for the Event Hub. Then calling client.getPartitionProperties with the
// identifier to get information about each partition.
final EventHubProperties properties = producer.getEventHubProperties();
System.out.printf("Event Hub Information: %s; Created: %s; PartitionIds: [%s]%n",
properties.getName(),
properties.getCreatedAt(),
String.join(", ", properties.getPartitionIds()));

// Sending an event to a specific partition.
final EventData event = new EventData("Hello world");
final SendOptions sendOptions = new SendOptions()
.setPartitionId("0");

System.out.println("Sending event to partition: " + sendOptions.getPartitionId());
producer.send(event, sendOptions);

System.out.println("Disposing of producer");
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

class EventHubProducerAsyncClientTest {
Expand Down Expand Up @@ -275,7 +275,7 @@ void sendSingleMessageWithBlock() throws InterruptedException {
final Message message = singleMessageCaptor.getValue();
Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType());

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -303,7 +303,7 @@ void partitionProducerCannotSendWithPartitionKey() {
.expectError(IllegalArgumentException.class)
.verify(Duration.ofSeconds(10));

verifyZeroInteractions(sendLink);
verifyNoInteractions(sendLink);
}

/**
Expand Down Expand Up @@ -366,7 +366,7 @@ void sendStartSpanSingleMessage() {
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
verify(tracer1, times(3)).end(eq("success"), isNull(), any());

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -554,7 +554,7 @@ void startMessageSpansOnCreateBatch() {
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
verify(tracer1, times(1)).end(eq("success"), isNull(), any());

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -853,7 +853,7 @@ void closesDedicatedConnection() {

// Verify
verify(hubConnection, times(1)).dispose();
verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand All @@ -873,7 +873,7 @@ void closesDedicatedConnectionOnlyOnce() {

// Verify
verify(hubConnection, times(1)).dispose();
verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -945,9 +945,9 @@ void reopensOnFailure() {
Assertions.assertEquals(count, messagesSent.size());

verify(sendLink2, times(1)).send(any(Message.class));
verifyZeroInteractions(sendLink3);
verifyNoInteractions(sendLink3);

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -1024,9 +1024,9 @@ void closesOnNonTransientFailure() {
final List<Message> messagesSent = messagesCaptor.getValue();
Assertions.assertEquals(count, messagesSent.size());

verifyZeroInteractions(sendLink2);
verifyZeroInteractions(sendLink3);
verifyZeroInteractions(onClientClosed);
verifyNoInteractions(sendLink2);
verifyNoInteractions(sendLink3);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -1101,9 +1101,9 @@ void resendMessageOnTransientLinkFailure() {
Assertions.assertEquals(count, messagesSent.size());

verify(sendLink2, times(1)).send(any(Message.class));
verifyZeroInteractions(sendLink3);
verifyNoInteractions(sendLink3);

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

private static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -204,7 +204,7 @@ public void sendStartSpanSingleMessage() {
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
verify(tracer1, times(2)).end(eq("success"), isNull(), any());

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -255,7 +255,7 @@ public void sendMessageRetrySpanTest() {
verify(tracer1, times(1)).addLink(any());
verify(tracer1, times(1)).end(eq("success"), isNull(), any());

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -320,7 +320,7 @@ public void sendMultipleMessages() {

messagesSent.forEach(message -> Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType()));

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down Expand Up @@ -404,7 +404,7 @@ public void startsMessageSpanOnEventBatch() {
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
verify(tracer1, times(2)).end(eq("success"), isNull(), any());

verifyZeroInteractions(onClientClosed);
verifyNoInteractions(onClientClosed);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

class AmqpReceiveLinkProcessorTest {
Expand Down Expand Up @@ -346,7 +346,7 @@ void noSubscribersWhenTerminated() {
linkProcessor.onSubscribe(subscription);

// Assert
verifyZeroInteractions(subscription);
verifyNoInteractions(subscription);
}


Expand Down

0 comments on commit 03ce19a

Please sign in to comment.