Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ sequence number and the timestamp of when it was enqueued.

### Create an instance of Storage container with SAS token

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/checkpointstore/blob/ReadmeSamples.java#L25-L29 -->
```java
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
Expand All @@ -91,32 +92,35 @@ In our example, we will focus on building the [`EventProcessor`][source_eventpro
[`BlobCheckpointStore`][source_blobcheckpointstore], and a simple callback function to process the events
received from the Event Hubs, writes to console and updates the checkpoint in Blob storage after each event.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/checkpointstore/blob/ReadmeSamples.java#L37-L63 -->
```java
class Program {
public static void main(String[] args) {
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
}
}
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
```

## Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.concurrent.TimeUnit;

/**
* WARNING: MODIFYING THIS FILE WILL REQUIRE CORRESPONDING UPDATES TO README.md FILE. LINE NUMBERS ARE USED TO EXTRACT
* APPROPRIATE CODE SEGMENTS FROM THIS FILE. ADD NEW CODE AT THE BOTTOM TO AVOID CHANGING LINE NUMBERS OF EXISTING CODE
* SAMPLES.
*
* Class containing code snippets that will be injected to README.md.
*/
public class ReadmeSamples {

/**
* Code sample for creating an async blob container client.
*/
public void createBlobContainerClient() {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
}

/**
* Code sample for consuming events from event processor with blob checkpoint store.
* @throws InterruptedException If the thread is interrupted.
*/
public void consumeEventsUsingEventProcessor() throws InterruptedException {
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
}
}
60 changes: 34 additions & 26 deletions sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ consumers.

The snippet below creates a synchronous Event Hub producer.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L30-L34 -->
```java
String connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
String eventHubName = "<< NAME OF THE EVENT HUB >>";
Expand Down Expand Up @@ -123,6 +124,7 @@ Follow the instructions in [Creating a service principal using Azure Portal][app
service principal and a client secret. The corresponding `clientId` and `tenantId` for the service principal can be
obtained from the [App registration page][app_registration_page].

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L41-L53 -->
```java
ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.clientId("<< APPLICATION (CLIENT) ID >>")
Expand Down Expand Up @@ -188,6 +190,7 @@ Event Hubs service to hash the events and send them to the same partition.
The snippet below creates a synchronous producer and sends events to any partition, allowing Event Hubs service to route
the event to an available partition.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L61-L83 -->
```java
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
Expand All @@ -208,7 +211,6 @@ for (EventData eventData : allEvents) {
}
}
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
Expand All @@ -226,6 +228,7 @@ Hub, their names are assigned at the time of creation. To understand what partit
using `EventHubsClientBuilder` can query for metadata about the Event Hub using `getPartitionIds()` or
`getEventHubProperties()`.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L90-L98 -->
```java
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
Expand All @@ -244,7 +247,12 @@ When an Event Hub producer is not associated with any specific partition, it may
Hubs service keep different events or batches of events together on the same partition. This can be accomplished by
setting a `partition key` when publishing the events.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L105-L113 -->
```java
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
.buildProducerClient();

CreateBatchOptions batchOptions = new CreateBatchOptions().setPartitionKey("grouping-key");
EventDataBatch eventDataBatch = producer.createBatch(batchOptions);

Expand All @@ -266,6 +274,7 @@ to newest events that get pushed to the partition by invoking `receiveFromPartit
can begin receiving events from multiple partitions using the same EventHubConsumerAsyncClient by calling
`receiveFromPartition(String, EventPosition)` with another partition id, and subscribing to that Flux.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L120-L128 -->
```java
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
Expand All @@ -283,6 +292,7 @@ consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
Developers can create a synchronous consumer that returns events in batches using an `EventHubConsumerClient`. In the
snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L135-L147 -->
```java
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
Expand Down Expand Up @@ -313,32 +323,30 @@ In our example, we will focus on building the [`EventProcessorClient`][EventProc
[`InMemoryCheckpointStore`][InMemoryCheckpointStore] available in samples, and a callback function that processes events
received from the Event Hub and writes to console.

<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L155-L176 -->
```java
class Program {
public static void main(String[] args) {
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new InMemoryCheckpointStore())
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
}
}
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new InMemoryCheckpointStore())
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out
.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// This will stop processing events.
eventProcessorClient.stop();
```

## Troubleshooting
Expand Down
Loading