Skip to content

Commit 03ce19a

Browse files
authored
Adds support for custom endpoints (Azure#18934)
* Exposing customEndpointAddress in Event HubClientBuilder * Add CHANGELOG entry. * Remove verifyZeroInteractions * Adding sample.
1 parent e34b2cb commit 03ce19a

File tree

6 files changed

+119
-23
lines changed

6 files changed

+119
-23
lines changed

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## 5.5.0-beta.1 (Unreleased)
44
- Use `BinaryData` in `EventData`.
5+
- Expose `EventHubsClientBuilder.customEndpointAddress` to support connecting to an intermediary before Azure Event
6+
Hubs.
57

68
## 5.4.0 (2021-01-14)
79
### New features

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import reactor.core.scheduler.Schedulers;
3838

3939
import java.net.InetSocketAddress;
40+
import java.net.MalformedURLException;
4041
import java.net.Proxy;
42+
import java.net.URL;
4143
import java.util.Locale;
4244
import java.util.Map;
4345
import java.util.Objects;
@@ -144,6 +146,7 @@ public class EventHubClientBuilder {
144146
private Integer prefetchCount;
145147
private ClientOptions clientOptions;
146148
private SslDomain.VerifyMode verifyMode;
149+
private URL customEndpointAddress;
147150

148151
/**
149152
* Keeps track of the open clients that were created from this builder when there is a shared connection.
@@ -266,6 +269,33 @@ public EventHubClientBuilder configuration(Configuration configuration) {
266269
this.configuration = configuration;
267270
return this;
268271
}
272+
/**
273+
* Sets a custom endpoint address when connecting to the Event Hubs service. This can be useful when your network
274+
* does not allow connecting to the standard Azure Event Hubs endpoint address, but does allow connecting through
275+
* an intermediary. For example: {@literal https://my.custom.endpoint.com:55300}.
276+
* <p>
277+
* If no port is specified, the default port for the {@link #transportType(AmqpTransportType) transport type} is
278+
* used.
279+
*
280+
* @param customEndpointAddress The custom endpoint address.
281+
* @return The updated {@link EventHubClientBuilder} object.
282+
* @throws IllegalArgumentException if {@code customEndpointAddress} cannot be parsed into a valid {@link URL}.
283+
*/
284+
public EventHubClientBuilder customEndpointAddress(String customEndpointAddress) {
285+
if (customEndpointAddress == null) {
286+
this.customEndpointAddress = null;
287+
return this;
288+
}
289+
290+
try {
291+
this.customEndpointAddress = new URL(customEndpointAddress);
292+
} catch (MalformedURLException e) {
293+
throw logger.logExceptionAsError(
294+
new IllegalArgumentException(customEndpointAddress + " : is not a valid URL.", e));
295+
}
296+
297+
return this;
298+
}
269299

270300
/**
271301
* Toggles the builder to use the same connection for producers or consumers that are built from this instance. By
@@ -662,8 +692,14 @@ private ConnectionOptions getConnectionOptions() {
662692
? verifyMode
663693
: SslDomain.VerifyMode.VERIFY_PEER_NAME;
664694

665-
return new ConnectionOptions(fullyQualifiedNamespace, credentials, authorizationType, transport, retryOptions,
666-
proxyOptions, scheduler, options, verificationMode);
695+
if (customEndpointAddress == null) {
696+
return new ConnectionOptions(fullyQualifiedNamespace, credentials, authorizationType, transport,
697+
retryOptions, proxyOptions, scheduler, options, verificationMode);
698+
} else {
699+
return new ConnectionOptions(fullyQualifiedNamespace, credentials, authorizationType, transport,
700+
retryOptions, proxyOptions, scheduler, options, verificationMode, customEndpointAddress.getHost(),
701+
customEndpointAddress.getPort());
702+
}
667703
}
668704

669705
private ProxyOptions getDefaultProxyConfiguration(Configuration configuration) {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.eventhubs;
5+
6+
import com.azure.core.amqp.AmqpTransportType;
7+
import com.azure.messaging.eventhubs.models.SendOptions;
8+
9+
/**
10+
* Sample demonstrates how to use an intermediary service to connect to Azure Event Hubs. In this demo, an application
11+
* gateway.
12+
*/
13+
public class PublishEventsCustomEndpoint {
14+
/**
15+
* Main method to invoke this demo about how to use an intermediary service to connect to Azure Event Hubs.
16+
*
17+
* @param args Unused arguments to the program.
18+
*/
19+
public static void main(String[] args) {
20+
// The connection string value can be obtained by:
21+
// 1. Going to your Event Hubs namespace in Azure Portal.
22+
// 3. Creating a "Shared access policy" for your Event Hubs namespace.
23+
// 4. Copying the connection string from the policy's properties.
24+
// (The default policy name is "RootManageSharedAccessKey".)
25+
String connectionString = "Endpoint={endpoint};SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={sharedAccessKey}";
26+
String eventHubName = "<< my-event-hub-name >>";
27+
28+
// The address of our intermediary service.
29+
String customEndpoint = "<< https://my-application-gateway.cloudapp.azure.com >>";
30+
31+
// Instantiate a client that will be used to call the service.
32+
// We are using WEB_SOCKETS.
33+
EventHubProducerClient producer = new EventHubClientBuilder()
34+
.connectionString(connectionString, eventHubName)
35+
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
36+
.customEndpointAddress(customEndpoint)
37+
.buildProducerClient();
38+
39+
// Querying the partition identifiers for the Event Hub. Then calling client.getPartitionProperties with the
40+
// identifier to get information about each partition.
41+
final EventHubProperties properties = producer.getEventHubProperties();
42+
System.out.printf("Event Hub Information: %s; Created: %s; PartitionIds: [%s]%n",
43+
properties.getName(),
44+
properties.getCreatedAt(),
45+
String.join(", ", properties.getPartitionIds()));
46+
47+
// Sending an event to a specific partition.
48+
final EventData event = new EventData("Hello world");
49+
final SendOptions sendOptions = new SendOptions()
50+
.setPartitionId("0");
51+
52+
System.out.println("Sending event to partition: " + sendOptions.getPartitionId());
53+
producer.send(event, sendOptions);
54+
55+
System.out.println("Disposing of producer");
56+
producer.close();
57+
}
58+
}

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
import static org.mockito.Mockito.times;
7979
import static org.mockito.Mockito.verify;
8080
import static org.mockito.Mockito.verifyNoMoreInteractions;
81-
import static org.mockito.Mockito.verifyZeroInteractions;
81+
import static org.mockito.Mockito.verifyNoInteractions;
8282
import static org.mockito.Mockito.when;
8383

8484
class EventHubProducerAsyncClientTest {
@@ -275,7 +275,7 @@ void sendSingleMessageWithBlock() throws InterruptedException {
275275
final Message message = singleMessageCaptor.getValue();
276276
Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType());
277277

278-
verifyZeroInteractions(onClientClosed);
278+
verifyNoInteractions(onClientClosed);
279279
}
280280

281281
/**
@@ -303,7 +303,7 @@ void partitionProducerCannotSendWithPartitionKey() {
303303
.expectError(IllegalArgumentException.class)
304304
.verify(Duration.ofSeconds(10));
305305

306-
verifyZeroInteractions(sendLink);
306+
verifyNoInteractions(sendLink);
307307
}
308308

309309
/**
@@ -366,7 +366,7 @@ void sendStartSpanSingleMessage() {
366366
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
367367
verify(tracer1, times(3)).end(eq("success"), isNull(), any());
368368

369-
verifyZeroInteractions(onClientClosed);
369+
verifyNoInteractions(onClientClosed);
370370
}
371371

372372
/**
@@ -554,7 +554,7 @@ void startMessageSpansOnCreateBatch() {
554554
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
555555
verify(tracer1, times(1)).end(eq("success"), isNull(), any());
556556

557-
verifyZeroInteractions(onClientClosed);
557+
verifyNoInteractions(onClientClosed);
558558
}
559559

560560
/**
@@ -853,7 +853,7 @@ void closesDedicatedConnection() {
853853

854854
// Verify
855855
verify(hubConnection, times(1)).dispose();
856-
verifyZeroInteractions(onClientClosed);
856+
verifyNoInteractions(onClientClosed);
857857
}
858858

859859
/**
@@ -873,7 +873,7 @@ void closesDedicatedConnectionOnlyOnce() {
873873

874874
// Verify
875875
verify(hubConnection, times(1)).dispose();
876-
verifyZeroInteractions(onClientClosed);
876+
verifyNoInteractions(onClientClosed);
877877
}
878878

879879
/**
@@ -945,9 +945,9 @@ void reopensOnFailure() {
945945
Assertions.assertEquals(count, messagesSent.size());
946946

947947
verify(sendLink2, times(1)).send(any(Message.class));
948-
verifyZeroInteractions(sendLink3);
948+
verifyNoInteractions(sendLink3);
949949

950-
verifyZeroInteractions(onClientClosed);
950+
verifyNoInteractions(onClientClosed);
951951
}
952952

953953
/**
@@ -1024,9 +1024,9 @@ void closesOnNonTransientFailure() {
10241024
final List<Message> messagesSent = messagesCaptor.getValue();
10251025
Assertions.assertEquals(count, messagesSent.size());
10261026

1027-
verifyZeroInteractions(sendLink2);
1028-
verifyZeroInteractions(sendLink3);
1029-
verifyZeroInteractions(onClientClosed);
1027+
verifyNoInteractions(sendLink2);
1028+
verifyNoInteractions(sendLink3);
1029+
verifyNoInteractions(onClientClosed);
10301030
}
10311031

10321032
/**
@@ -1101,9 +1101,9 @@ void resendMessageOnTransientLinkFailure() {
11011101
Assertions.assertEquals(count, messagesSent.size());
11021102

11031103
verify(sendLink2, times(1)).send(any(Message.class));
1104-
verifyZeroInteractions(sendLink3);
1104+
verifyNoInteractions(sendLink3);
11051105

1106-
verifyZeroInteractions(onClientClosed);
1106+
verifyNoInteractions(onClientClosed);
11071107
}
11081108

11091109
private static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec "

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import static org.mockito.Mockito.never;
6363
import static org.mockito.Mockito.times;
6464
import static org.mockito.Mockito.verify;
65-
import static org.mockito.Mockito.verifyZeroInteractions;
65+
import static org.mockito.Mockito.verifyNoInteractions;
6666
import static org.mockito.Mockito.when;
6767

6868
/**
@@ -204,7 +204,7 @@ public void sendStartSpanSingleMessage() {
204204
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
205205
verify(tracer1, times(2)).end(eq("success"), isNull(), any());
206206

207-
verifyZeroInteractions(onClientClosed);
207+
verifyNoInteractions(onClientClosed);
208208
}
209209

210210
/**
@@ -255,7 +255,7 @@ public void sendMessageRetrySpanTest() {
255255
verify(tracer1, times(1)).addLink(any());
256256
verify(tracer1, times(1)).end(eq("success"), isNull(), any());
257257

258-
verifyZeroInteractions(onClientClosed);
258+
verifyNoInteractions(onClientClosed);
259259
}
260260

261261
/**
@@ -320,7 +320,7 @@ public void sendMultipleMessages() {
320320

321321
messagesSent.forEach(message -> Assertions.assertEquals(Section.SectionType.Data, message.getBody().getType()));
322322

323-
verifyZeroInteractions(onClientClosed);
323+
verifyNoInteractions(onClientClosed);
324324
}
325325

326326
/**
@@ -404,7 +404,7 @@ public void startsMessageSpanOnEventBatch() {
404404
.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE));
405405
verify(tracer1, times(2)).end(eq("success"), isNull(), any());
406406

407-
verifyZeroInteractions(onClientClosed);
407+
verifyNoInteractions(onClientClosed);
408408
}
409409

410410
/**

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import static org.mockito.ArgumentMatchers.eq;
3737
import static org.mockito.Mockito.mock;
3838
import static org.mockito.Mockito.verify;
39-
import static org.mockito.Mockito.verifyZeroInteractions;
39+
import static org.mockito.Mockito.verifyNoInteractions;
4040
import static org.mockito.Mockito.when;
4141

4242
class AmqpReceiveLinkProcessorTest {
@@ -346,7 +346,7 @@ void noSubscribersWhenTerminated() {
346346
linkProcessor.onSubscribe(subscription);
347347

348348
// Assert
349-
verifyZeroInteractions(subscription);
349+
verifyNoInteractions(subscription);
350350
}
351351

352352

0 commit comments

Comments
 (0)