Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void logCheckpointSuccess(EventContext context, EventData eventData) {
public void checkpoint(EventContext context) {
}

@Override
public void checkpoint(EventBatchContext context) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
/**
* No need to do checkpoint in manual mode. Effective when {@link CheckpointMode#MANUAL}
*
* @author Warren Zhu
*/
public class ManualCheckpointManager extends EventCheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(ManualCheckpointManager.class);
Expand All @@ -24,6 +23,7 @@ public class ManualCheckpointManager extends EventCheckpointManager {
() -> "ManualCheckpointManager should have checkpointMode manual");
}

@Override
protected Logger getLogger() {
return LOG;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* Do checkpoint when uncheckpointed count exceeds {@link CheckpointConfig#getCount()} ()} for one partition.
* Effective when {@link CheckpointMode#PARTITION_COUNT}
*
* @author Warren Zhu
*/
class PartitionCountCheckpointManager extends EventCheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(PartitionCountCheckpointManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* Do checkpoint after each message successfully processed.
* Effective when {@link CheckpointMode#RECORD}
*
* @author Warren Zhu
*/
class RecordCheckpointManager extends EventCheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(RecordCheckpointManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
* Do checkpoint when the time since last successful checkpoint exceeds {@link CheckpointConfig#getInterval()} ()}
* for one partition. Effective when {@link CheckpointMode#PARTITION_COUNT}
*
* @author Warren Zhu
*/
class TimeCheckpointManager extends EventCheckpointManager {
private static final Logger LOG = LoggerFactory.getLogger(TimeCheckpointManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@


/**
* A processor container using {@link EventProcessorClient} to subscribe to Event Hub entities and consumer events from
* all the partitions of each Event Hub entity.
*
* <p>
* For different combinations of Event Hub and consumer group, different {@link EventProcessorClient}s will be created to
* subscribe to it.
* </p>
*
* Implementation of {@link EventProcessingListener} is required to be provided when using {@link EventHubsProcessorContainer}
* to consume events.
* @see EventProcessingListener
*/
public class EventHubsProcessorContainer implements Lifecycle, DisposableBean {

Expand All @@ -27,6 +37,10 @@ public class EventHubsProcessorContainer implements Lifecycle, DisposableBean {
private final List<EventProcessorClient> clients = new ArrayList<>();
private final AtomicBoolean isRunning = new AtomicBoolean(false);

/**
* Create an instance using the supplied processor factory.
* @param processorFactory the processor factory.
*/
public EventHubsProcessorContainer(EventHubsProcessorFactory processorFactory) {
this.processorFactory = processorFactory;
}
Expand All @@ -36,6 +50,10 @@ public void destroy() {
stop();
}

/**
* Start all {@link EventProcessorClient}s created by this processor container to consume events from all partitions
* of the associated destinations and consumer groups.
*/
@Override
public void start() {
if (!isRunning.compareAndSet(false, true)) {
Expand All @@ -45,6 +63,10 @@ public void start() {
this.clients.forEach(EventProcessorClient::start);
}

/**
* Stop all {@link EventProcessorClient}s owned by this processor container to process events
* for all partitions owned by the related event processor clients.
*/
@Override
public void stop() {
if (!isRunning.compareAndSet(true, false)) {
Expand All @@ -60,6 +82,14 @@ public boolean isRunning() {
}


/**
* Subscribe to an Event Hub in the context of a consumer group to consumer events from all the partitions.
*
* @param eventHubName Event Hub entity name
* @param consumerGroup Consumer group name
* @param listener the listener to process Event Hub events.
* @return the {@link EventProcessorClient} created to subscribe.
*/
public EventProcessorClient subscribe(String eventHubName, String consumerGroup, EventProcessingListener listener) {
EventProcessorClient processor = this.processorFactory.createProcessor(eventHubName, consumerGroup, listener);
processor.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
* The Azure Event Hubs specific {@link NestedRuntimeException}.
*
*/
public class EventHubsRuntimeException extends NestedRuntimeException {
public final class EventHubsRuntimeException extends NestedRuntimeException {

/**
* Construct {@code EventHubsRuntimeException} with the specified detail message.
* @param msg the exception information.
*/
public EventHubsRuntimeException(String msg) {
super(msg);
}

/**
* Construct {@code EventHubsRuntimeException} with the specified detail message and nested exception.
* @param msg the specified detail message.
* @param cause the nested exception.
*/
public EventHubsRuntimeException(String msg, Throwable cause) {
super(msg, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.stream.Collectors;

/**
*
* A template for executing sending operations asynchronously to Event Hubs.
*/
public class EventHubsTemplate implements SendOperation, BatchSendOperation {

Expand All @@ -35,6 +35,10 @@ public class EventHubsTemplate implements SendOperation, BatchSendOperation {
private final EventHubsProducerFactory producerFactory;
private EventHubsMessageConverter messageConverter = new EventHubsMessageConverter();

/**
* Create an instance using the supplied producer factory.
* @param producerFactory the producer factory.
*/
public EventHubsTemplate(EventHubsProducerFactory producerFactory) {
this.producerFactory = producerFactory;
}
Expand Down Expand Up @@ -106,6 +110,10 @@ private CreateBatchOptions buildCreateBatchOptions(PartitionSupplier partitionSu
.setPartitionKey(partitionSupplier != null ? partitionSupplier.getPartitionKey() : null);
}

/**
* Set the message converter.
* @param messageConverter the message converter.
*/
public void setMessageConverter(EventHubsMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* The {@link EventHubsProcessorFactory} implementation to produce new {@link EventProcessorClient} instances
* for provided {@link CheckpointStore} {@code checkpointStore} and optional {@link NamespaceProperties} and
* processor {@link PropertiesSupplier} on each {@link #createProcessor} invocation.
*
* <p>
* The created {@link EventProcessorClient}s are cached according to the event hub names and consumer groups.
* </p>
* <p>
* {@link EventProcessorClient} produced by this factory will share the same namespace level configuration, but if a
* configuration entry is provided at both processor and namespace level, the processor level configuration will take
* advantage.
* </p>
*/
public class DefaultEventHubsNamespaceProcessorFactory implements EventHubsProcessorFactory, DisposableBean {
public final class DefaultEventHubsNamespaceProcessorFactory implements EventHubsProcessorFactory, DisposableBean {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubsNamespaceProcessorFactory.class);

Expand All @@ -42,20 +51,41 @@ public class DefaultEventHubsNamespaceProcessorFactory implements EventHubsProce
private final Map<Tuple2<String, String>, EventProcessorClient> processorClientMap = new ConcurrentHashMap<>();
private final ProcessorPropertiesParentMerger propertiesMerger = new ProcessorPropertiesParentMerger();

/**
* Construct a factory with the provided {@link CheckpointStore}.
* @param checkpointStore the checkpoint store.
*/
public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore) {
this(checkpointStore, null, null);
}

/**
* Construct a factory with the provided {@link CheckpointStore} and namespace level properties.
* @param checkpointStore the checkpoint store.
* @param namespaceProperties the namespace properties.
*/
public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore,
NamespaceProperties namespaceProperties) {
this(checkpointStore, namespaceProperties, key -> null);
}

/**
* Construct a factory with the provided {@link CheckpointStore} and processor {@link PropertiesSupplier}.
* @param checkpointStore the checkpoint store.
* @param supplier the {@link PropertiesSupplier} to supply {@link ProcessorProperties} for each event hub.
*/
public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore,
PropertiesSupplier<Tuple2<String, String>,
ProcessorProperties> supplier) {
this(checkpointStore, null, supplier);
}

/**
* Construct a factory with the provided {@link CheckpointStore}, namespace level properties and processor {@link PropertiesSupplier}.
* @param checkpointStore the checkpoint store.
* @param namespaceProperties the namespace properties.
* @param supplier the {@link PropertiesSupplier} to supply {@link ProcessorProperties} for each event hub.
*/
public DefaultEventHubsNamespaceProcessorFactory(CheckpointStore checkpointStore,
NamespaceProperties namespaceProperties,
PropertiesSupplier<Tuple2<String, String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,29 @@
*/
public interface EventHubsProcessorFactory {

/**
* Create an {@link EventProcessorClient} to consume events from the specified event hub in the context of the given
* consumer group.
* @param eventHub the event hub to consume events from
* @param consumerGroup the consumer group
* @param listener the {@link EventProcessingListener} to consume events with
* @return the EventProcessorClient.
*/
EventProcessorClient createProcessor(String eventHub, String consumerGroup, EventProcessingListener listener);

/**
* Add a listener for this factory.
* @param listener the listener
*/
default void addListener(Listener listener) {

}

/**
* Remove a listener
* @param listener the listener
* @return true if removed.
*/
default boolean removeListener(Listener listener) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* The {@link EventHubsProducerFactory} implementation to produce cached {@link EventHubProducerAsyncClient} instances
* for provided {@link NamespaceProperties} and optional producer {@link PropertiesSupplier} on each
* {@link #createProducer} invocation.
* <p>
* The created {@link EventHubProducerAsyncClient}s are cached according to the event hub names.
* =</p>
* <p>
* {@link EventHubProducerAsyncClient} produced by this factory will share the same namespace level configuration, but
* if a configuration entry is provided at both producer and namespace level, the producer level configuration will
* take advantage.
* </p>
*/
public final class DefaultEventHubsNamespaceProducerFactory implements EventHubsProducerFactory, DisposableBean {

Expand All @@ -31,10 +39,19 @@ public final class DefaultEventHubsNamespaceProducerFactory implements EventHubs
private final Map<String, EventHubProducerAsyncClient> clients = new ConcurrentHashMap<>();
private final ProducerPropertiesParentMerger parentMerger = new ProducerPropertiesParentMerger();

/**
* Construct a factory with the provided namespace level configuration.
* @param namespaceProperties the namespace properties
*/
public DefaultEventHubsNamespaceProducerFactory(NamespaceProperties namespaceProperties) {
this(namespaceProperties, key -> null);
}

/**
* Construct a factory with the provided namespace level configuration and producer {@link PropertiesSupplier}.
* @param namespaceProperties the namespace properties.
* @param supplier the {@link PropertiesSupplier} to supply {@link ProducerProperties} for each event hub.
*/
public DefaultEventHubsNamespaceProducerFactory(NamespaceProperties namespaceProperties,
PropertiesSupplier<String, ProducerProperties> supplier) {
this.namespaceProperties = namespaceProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,26 @@
*/
public interface EventHubsProducerFactory {

/**
* Create {@link EventHubProducerAsyncClient} to send events to the event hub.
* @param eventHub the event hub
* @return the producer.
*/
EventHubProducerAsyncClient createProducer(String eventHub);

/**
* Add a listener for this factory.
* @param listener the listener
*/
default void addListener(Listener listener) {

}

/**
* Remove a listener
* @param listener the listener
* @return true if removed.
*/
default boolean removeListener(Listener listener) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private String extractEventHubNameFromConnectionString() {
// servicebus.windows.net)
// Endpoint=sb://<FQDN>/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>
// https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string
@Override
public String getFQDN() {
return this.namespace == null ? extractFqdnFromConnectionString() : (this.namespace + "." + domainName);
}
Expand All @@ -48,6 +49,10 @@ public String getDomainName() {
return domainName;
}

/**
* Set the domain name.
* @param domainName the domain name.
*/
public void setDomainName(String domainName) {
this.domainName = domainName;
}
Expand All @@ -57,6 +62,10 @@ public String getNamespace() {
return namespace;
}

/**
* Set the namespace.
* @param namespace the namespace.
*/
public void setNamespace(String namespace) {
this.namespace = namespace;
}
Expand All @@ -66,6 +75,10 @@ public String getEventHubName() {
return eventHubName == null ? extractEventHubNameFromConnectionString() : this.eventHubName;
}

/**
* Set the event hub name.
* @param eventHubName the event hub name.
*/
public void setEventHubName(String eventHubName) {
this.eventHubName = eventHubName;
}
Expand All @@ -75,6 +88,10 @@ public String getConnectionString() {
return connectionString;
}

/**
* Set the connection string.
* @param connectionString the connection string.
*/
public void setConnectionString(String connectionString) {
this.connectionString = connectionString;
}
Expand All @@ -84,6 +101,10 @@ public String getCustomEndpointAddress() {
return customEndpointAddress;
}

/**
* Set the custom endpoint address.
* @param customEndpointAddress the custom endpoint address.
*/
public void setCustomEndpointAddress(String customEndpointAddress) {
this.customEndpointAddress = customEndpointAddress;
}
Expand Down
Loading