-
Notifications
You must be signed in to change notification settings - Fork 737
Databus 2.0 Client Load Balancing API
The new API to support client load balancing will allow client-application to get a handle for each registrations which they can use to operate and get stats about the pipeline(s) (pullers + dispatchers). This registration handle will be similar to Databus V3 registration interface.
There will be two new interfaces:
- DatabusRegistration: This contains API to let clients operate the pipeline for this registration.
- Multi Partition Registration: This is not a public interface. This extends DatabusRegistration interface. When clients register for cluster-aware consumption (e.g load-balanced clients), an object implementing this interface will be constructed but returned as base type (DatabusRegistration). The multi-partition registration will be managing individual registrations for each partition under the hood.
A Base Interface for Registration which is the databus-client handle for the client application to handle the consumer callbacks that were registered in a single register() call. An object implementing this interface will be returned for registrations of types:
- A single partition belonging to a source-partitioned stream (eg. Consumer of a single partition for Espresso)
- A source unpartitioned stream (with/o server-side partition) (e.g Databus V2 use-case)
package com.linkedin.databus.client.pub; import java.util.List; import org.apache.log4j.Logger; import com.linkedin.databus.client.pub.mbean.ConsumerCallbackStatsMBean; import com.linkedin.databus.core.Checkpoint; import com.linkedin.databus.core.DatabusComponentStatus; import com.linkedin.databus.core.data_model.DatabusSubscription; import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollectorMBean; import com.linkedin.databus2.core.filter.DbusKeyCompositeFilterConfig; /** * * Base Interface for Registration which is the databus-client handle for the client application * to handle the consumer callbacks that were registered in a single register() call. * */ public interface DatabusRegistration { public enum RegistrationState { INIT, // Initialization state. Dbus Client library has not setup registration for its consumers yet. REGISTERED, // Consumers have been registered but pipeline not started yet. STARTED, // Pipeline(s) started PAUSED, // Pipeline(s) paused RESUMED, // Pipeline(s) resumed SHUTDOWN, // Pipeline shutdown DEREGISTERED; // State when the client library unregisters and removes this from its internal data-structures. /** * pipeline(s) running * @return */ public boolean isRunning() { switch (this) { case STARTED: case PAUSED: case RESUMED: return true; default: return false; } } /** * Registration is actively maintained in the client library * @return */ public boolean isActiveRegistration() { switch (this) { case REGISTERED: case STARTED: case PAUSED: case RESUMED: case SHUTDOWN: return true; default: return false; } } }; /** * API used to start the pipeline(s) (pullers + dispatchers) * * @throws IllegalStateException if the registration is not in REGISTERED state. * @throws DatabusClientException * If there are no subscriptions or callbacks registered * if this registration cannot service the sources/subscriptions together. * @return false if the client is already started else returns true. * */ public boolean start() throws IllegalStateException, DatabusClientException; /** * API used to shutdown the pipeline(s) (pullers + dispatchers) * * @throws IllegalStateException if the registration is not in REGISTERED state. */ public void shutdown() throws IllegalStateException; /** * API used to pause the pipeline(s) (pullers + dispatchers) * * @throws IllegalStateException if the registration is not in REGISTERED state. */ public void pause() throws IllegalStateException; /** * API used to resume the pipeline(s) (pullers + dispatchers) * * @throws IllegalStateException if the registration is not in REGISTERED state. */ public void resume() throws IllegalStateException; /** * * Get state of registration. */ public RegistrationState getState(); /** * * De-registers this registration from the client library. * @return false if this was already deregistered/not-found * else return true * @throws IllegalStateException if the registration is not in REGISTERED state */ public boolean deregister() throws IllegalStateException; /** * Obtains a cloned Collection of all subscriptions associated with this registration. Changing this subscription * will not have any effect on the registration * */ public Collection<DatabusSubscription> getSubscriptions(); /** * Returns an object that implements DatabusComponentStatus * Helpful for obtaining diagnostics regarding the registration and also to pause/resume the pipeline(s) (pullers+dispatchers) */ public DatabusComponentStatus getStatus(); /** Obtains a logger used by databus for logging messages associated with this registration */ public Logger getLogger(); /** Parent Registration if this is part of MultiPartitionRegistration */ public DatabusRegistration getParent(); /** * * API for building Registration with client-application defined regId * * @param regId * @return this instance after adding regId * @throws DatabusClientException if the regId is already being used. * @throws IllegalStateException if the registration has already started. */ public DatabusRegistration withRegId(RegistrationId regId) throws DatabusClientException, IllegalStateException; /** * * API for building Registration with serverSideFiltering * * @param filterConfig * @return this instance after adding server-side filter * @throws IllegalStateException if the registration has already started. */ public DatabusRegistration withServerSideFilter(DbusKeyCompositeFilterConfig filterConfig) throws IllegalStateException; /** * * For non-partitioned consumers, returns null. * For source-partitioned or load-balanced registration, returns the partition that this registration serves. * * @return partitionId */ public Collection<DbusPartitionInfo> getPartitions(); /** * Last Persisted checkpoint. This is a copy of the actual checkpoint * and changing this will not alter the dbus-client's checkpoint. */ public Checkpoint getLastPersistedCheckpoint(); /** * API to allow checkpoint to be set by the client application. Storing checkpoint * @throws IllegalStateException if the Registration state is not in one of the below states : ( INIT, REGISTERED ) */ public boolean storeCheckpoint(Checkpoint ckpt) throws IllegalStateException; /** Obtains the inbound relay event statistics for the registration */ public DbusEventsStatisticsCollectorMBean getRelayEventStats(); /** Obtains the inbound bootstrap event statistics */ public DbusEventsStatisticsCollectorMBean getBootstrapEventStats(); /** Obtain statistics for the callbacks for relay events*/ public ConsumerCallbackStatsMBean getRelayCallbackStats(); /** Obtain statistics for the callbacks for bootstrap events*/ public ConsumerCallbackStatsMBean getBootstrapCallbackStats(); /** * Fetch the most recent sequence number across all relays * @param FetchMaxSCNRequest : Request params for fetchMaxSCN * @return RelayFindMaxSCNResult * * Note : Not supported currently */ public RelayFindMaxSCNResult fetchMaxSCN(FetchMaxSCNRequest request) throws InterruptedException; /** * * Makes the Client point to the relays (specified in RelayFindMaxSCNResult) with the most recent sequence number * and waits (with timeout) for the consumer callback to reach this SCN. This is a * bounded blocking call. It will wait for timeout milliseconds for the consumer * callback to reach the maxScn before returning from this method * * @param fetchSCNResult : FetchMaxScn result object. * @param flushRequest : Request params for flush. * * Note : Not supported currently * */ public RelayFlushMaxSCNResult flush(RelayFindMaxSCNResult fetchSCNResult, FlushRequest flushRequest) throws InterruptedException; /** * * Discovers the most recent sequence number across all relays for the given subscriptions and uses flush * on the relay with that max SCN. * * @param FetchMaxSCNRequest : Request params for fetchMaxSCN. * @param flushRequest : Request params for flush. * * Note : Not supported currently */ public RelayFlushMaxSCNResult flush(FetchMaxSCNRequest maxScnRequest, FlushRequest flushRequest) throws InterruptedException; }
Registration which allows registering consumer(s) to subscribe to sources across source partitions (physical partitions or load-balanced partitions). Load-balanced cluster registration is an example for this use-case. This is not an external-facing API. The clients are expected to see all registrations as DatabusRegistration. But under the hood, an object of type DatabusMultiPartitionedRegistration will be constructed and returned as base interface (DatabusRegistration) type.
In the case of cluster-aware registration, when the client-library receives helix notification to subscribe to a new partition, the dbus client library
- Invoke this callback to let client application create new consumer callbacks for this partiiton
- Invoke DbusServerSideFilterFactory (if provided) to let client application setup server-side filter.
- Construct a registration with the subscriptions provided during registerCluster() call and add the callbacks from (a) and filter from (b) to it.
- Invoke DbusPartitionListener.onAddPartition(..) to let the client set checkpoints and regId if needed
- Start the new registration.
The load-balanced consumers will still have separate “DatabusRegistration” for each of the partitions they are listening to but each such “DatabusRegistration” will have the DatabusMultiPartitionRegistration as its parent.
public interface DatabusMultiPartitionRegistration extends DatabusRegistration { /** * Children registrations per partition * @return a read-only copy of the {@link DbusPartitionInfo} to {@link DatabusRegistration} mapping **/ public Map<DbusPartitionInfo, DatabusRegistration> getPartitionRegs(); }
Encapsulates information about the cluster and its settings. Databus Client library is responsible for constructing this object and passing it as argument to listener callbacks for client processing.
public class DbusClusterInfo { private final String _groupName; private final long _totalPartitions; private final long _minActiveNodes; /** * * @param clusterType * @param groupName * @param totalPartitions * @param minActiveNodes */ public DbusClusterInfo( String groupName, long totalPartitions, long minActiveNodes) { _groupName = groupName; _totalPartitions = totalPartitions; _minActiveNodes = minActiveNodes; } /** Group Name of the cluster that this registration wants to join */ public String getGroupName() { return _groupName; } /** * Total Number of partitions for the PEER cluster type. */ public long getNumTotalPartitions() { return _totalPartitions; } /** * Minimum number of client nodes to be Alive before this node starts listening to active partitions. * At any time, if the number of nodes becomes less than this critical number, the nodes in the cluster will be suspended. */ public long getMinimumActiveNodes() { return _minActiveNodes; } }
Factory to instantiate new consumer callbacks
In the case of cluster-aware registration, when the client-library receives helix notification to subscribe to a new partition, the dbus client library
- Invoke this callback to let client application create new consumer callbacks for this partiiton
- Invoke DbusServerSideFilterFactory (if provided) to let client application setup server-side filter.
- Construct a registration with the subscriptions provided during registerCluster() call and add the callbacks from (a) and filter from (b) to it.
- Invoke DbusPartitionListener.onAddPartition(..) to let the client set checkpoints and regId if needed
- Start the new registration.
public interface DbusClusterConsumerFactory { /** * * Factory to instantiate new consumer callbacks. * * In the case of cluster-aware registration, when the client-library receives helix notification * to subscribe to a new partition, the dbus client library * * (a) invoke this callback to let client application create new consumer callbacks for this partiiton * (b) Invoke DbusServerSideFilterFactory (if provided) to let client application setup server-side filter. * (b) Construct a registration with the subscriptions provided during registerCluster() call and add the callbacks from (a) and filter from (b) to it. * (c) invoke DbusPartitionListener.onAddPartition(..) to let the client set checkpoints and regId if needed * (d) start the new registration. * * The client application is expected to create new instance of consumer callbacks and not reuse consumer callbacks that * have been associated with other registrations or returned in previous createPartitionedConsumers() calls. * No synchronization will be provided to ensure thread-safety if consumer callbacks are reused. * * clusterInfo : DbusClientClusterInfo provided by the client-app during registration. * partitionInfo: DbusPartitionInfo corresponding to the cluster manager notification */ Collection<DatabusCombinedConsumer> createPartitionedConsumers(DbusClusterInfo clusterInfo, DbusPartitionInfo partitionInfo); }
A factory to setup server-side filtering corresponding to new partitions that are getting added. Default implementations for this interface will be provided by the databus client library for both MOD and RANGE based partitions so that the client-application does not not have to worry about implementing this transformation.
/** * Factory interface for generating server-side filter corresponding to the partition * that is getting added. * */ public interface DbusServerSideFilterFactory { public DbusKeyCompositeFilterConfig createServerSideFilter(DbusClusterInfo cluster, DbusPartitionInfo partition) throws InvalidConfigException; } Concrete Implementations: -------------------------- Mod Partitioned: ---------------- public class DbusModPartitionedFilterFactory implements DbusServerSideFilterFactory { ... } Range Partitioned: ------------------ public class DbusRangePartitionedFilterFactory implements DbusServerSideFilterFactory { ... }
Partition Interface for the client application. Created by databus client-library and provided to the client application in listener callbacks.
/** * * Databus Partition * */ public interface DbusPartitionInfo { /** * * @return numeric id of this partition */ public long getPartitionId(); /** * * Checks if other partition is equal to this instance * @param other * @return true if equal otherwise false */ public boolean equalsPartition(DbusPartitionInfo other); }
Listener interface triggered for the cluster that was registered when this client library receives notification from clusterManager and detects additions/removal of partitions.
/** * * Databus Partition Listener interface. The client application is responsible for implementing a concrete class for this interface and * registering in DatabusClient.registerCluster() * * */ public interface DbusPartitionListener { /** * * Listener interface triggered when this client instance starts listening to new partition * Triggered when the registeration is created with call-backs, subscriptions and server-side filter set. * Registration will be in INIT state. The client application can use this API to change checkpoint/regId if needed. * * @param partitionInfo : Databus Partition Info * @param reg : Databus Registration controlling this stream partition. */ public void onAddPartition(DbusPartitionInfo partitionInfo, DatabusRegistration reg); /** * Listener interface triggered when the client instance stops listening to a partition. * Triggered after registration is shutdown but before deregistering. * @param partitionInfo : Databus Partition Info * @param reg : Databus Registration controlling this stream partition. */ public void onDropPartition(DbusPartitionInfo partitionInfo, DatabusRegistration reg); }
public interface DatabusClient { ..... ..... /** * Creates registration for a list of sources for one consumer */ public DatabusRegistration register(DatabusCombinedConsumer consumer, String ... sources); /** * Creates registration for a list of sources for a collection of consumers */ public DatabusRegistration register(Collection<DatabusCombinedConsumer> consumers, String ... sources); /** * Creates a cluster-aware consumer */ public DatabusRegistration registerCluster(String cluster, DbusClusterConsumerFactory consumerFactory, DbusServerSideFilterFactory filterFactory, DatabusPartitionListener partitionListener, String ... sources); }
ClusterInfo will be provided as databus configs:
databus.client.cluster(1).name = “cluster1” databus.client.cluster(1).totalPartitions=16 databus.client.cluster(1).minActiveNodes=5 databus.client.cluster(1).checkpointIntervalMs=300000
public class MyConsumer extends AbstractDatabusCombinedConsumer { ........... ........... }
public class MyConsumerFactory implements DbusClusterConsumerFactory { @Override Collection<DatabusCombinedConsumer> createPartitionedConsumers(DbusClusterInfo clusterInfo, DbusPartitionInfo partitionInfo) { // Set Consumers (mandatory) DatabusCombinedConsumer c1 = new MyConsumer(); return Arrays.asList(c1); } } public class MyListener implements DbusPartitionListener { @Override public void onAddPartition(DbusPartitionInfo partitionInfo, DatabusRegistration reg) { // Set RegId (optional) reg.withRegId(new RegistrationId("myId" + partitionInfo+getPartitionId())); //Set Checkpoint (optional. Do only if you really want to mange the checkpoint) Checkpoint ckpt = new Checkpoint(); ckpt.setWindowSCN(...) .... reg.storeCheckpoint(ckpt); // Log and/or process this notification. } @Override public void onDropPartition(DbusPartitionInfo partitionInfo) { // Log and/or process this notification. } }
import com.linkedin.databus.core.util.ConfigBuilder; import com.linkedin.databus.core.util.ConfigLoader; import com.linkedin.databus.core.util.InvalidConfigException; import com.linkedin.databus.client.DatabusHttpClientImpl; public class Databus2ClientReg { private DatabusHttpClientImpl _httpClient; public registerDatabus2ClientAndStart(Properties dbus2ClientProps) { // Instantiate DatabusHttpClient Config configBuilder = new Config(); ConfigLoader<StaticConfig> configLoader = new ConfigLoader<StaticConfig>("databus2.client.", configBuilder); configLoader.loadConfig(databus2ClientProps); StaticConfig clientConfig = configBuilder.build(); _httpClient = new DatabusHttpClientImpl(_clientConfig.getClient()); //save it in member_variable for shutdown String src1 = "com.linkedin.events.liar.jobrelay.LiarJobRelay"; String src2 = "com.linkedin.events.liar.memberrelay.LiarMemberRelay"; // instantiate listener DbusPeerPartitionListener myListener = new MyListener(); //instantiate Consumer Factory DbusClusterConsumerFactory myConsFactory = new MyConsumerFactory(); //Instantiate Server-Side Filter (MOD partition) DbusModPartitionedFilterFactory filterFactory = new DbusModPartitionedFilterFactory(src1, src2); // register and start DatabusRegistration reg = _httpClient.registerCluster("myCluster", myConsFactory, filterFactory, myListener, src1, src2); reg.start(); _httpClient.start(); // Call _httpClient.shutdown during shutdown of the client application. } }
There are many ways for loading the configuration. Here is an example of using config2 semantics
<property name="<Your_app>.databus2.consumer.config"> <props> <entry key="databus2.client.container.httpPort" value="<Databus Client Admin Port>" /> <entry key="databus2.client.container.id" value="301" /> <entry key="databus2.client.checkpointPersistence.fileSystem.rootDirectory" value="/export/content/<YOUR_APPLICATION_NAME>/i001/databus2-relay-liar-checkpoints" /> <entry key="databus2.client.loggingListener.logTypedValue" value="true" /> <entry key="databus2.client.connectionDefaults.eventBuffer.maxSize" value="5000000" /> <entry key="databus2.client.connectionDefaults.eventBuffer.readBufferSize" value="102400" /> <entry key="databus2.client.connectionDefaults.eventBuffer.scnIndexSize" value="1024000" /> <entry key="databus2.client.connectionDefaults.eventBuffer.allocationPolicy" value="DIRECT_MEMORY" /> <entry key="databus2.client.connectionDefaults.eventBuffer.queuePolicy" value="BLOCK_ON_WRITE" /> <entry key="databus2.client.connectionDefaults.consumerTimeBudgetMs" value="1200000" /> <entry key="databus2.client.connectionDefaults.pullerRetries.initSleep" value="100" /> <entry key="databus2.client.connectionDefaults.pullerRetries.maxRetryNum" value="-1" /> <entry key="databus2.client.runtime.relay(1).host" value="< Relay VIP Host>" /> <entry key="databus2.client.runtime.relay(1).port" value="<Relay VIP Port>" /> <entry key="databus2.client.runtime.relay(1).sources" value="<Comma-seperated source names list>" /> <entry key="databus2.client.container.jmx.jmxServicePort" value="9999" /> <entry key="databus2.client.runtime.bootstrap.enabled" value="true" /> <entry key="databus2.client.runtime.bootstrap.service(1).host" value="<Bootstrap Server VIP Host>" /> <entry key="databus2.client.runtime.bootstrap.service(1).port" value="<Bootstrap Server VIP Port>" /> <entry key="databus2.client.runtime.bootstrap.service(1).sources" value="<Comma-seperated source names list>" /> <entry key="databus2.client.clientCluster(1).clusterName" value="<Client Cluster Name>" /> // CLient Cluster name. The name has to be unique for a given fabric. <entry key="databus2.client.clientCluster(1).zkAddr" value="<Colon seperated ZKHOSTPORT" /> // ZK hostport configs <entry key="databus2.client.clientCluster(1).numPartitions" value="<Number of Partitions>" /> // Total number of partitions for this client cluster <entry key="databus2.client.clientCluster(1).quorum" value="<Quorum Size>" /> // Number Of Clients to be alive before partition assignment happens <entry key="databus2.client.clientCluster(1).checkpointIntervalMs" value="<Min time period in ms between persisting chpts- min value 5 min>" /> <== This is optimization. </props> </property>
Here is the link to REST API