-
Notifications
You must be signed in to change notification settings - Fork 736
Databus 2.0 Client Load Balancing Design
Chavdar Botev edited this page Sep 30, 2013
·
3 revisions
The page outlines the functionality, design and some implementation details of ‘client load balancing’ feature of databus v2 client library.
- Databus V2 client already provides the ability to specify filters that will be applied on the databus v2 stream at the service (server-side-filtering). The client is required to configure partition settings, which is static.
- Assumptiion here is that the client nodes are symmetric; i.e. they can handle any partition(s)
- The intended functionality is to
- dynamically assign partition numbers to the client nodes on start up:
- handle failure of a node by reassigning partitions handled by failed node(s) to active participants
- handle addition of client nodes with minimum re-assignments
- delay assignment of partitions until a condition , such as minimum number of participants have joined the cluster
- checkpoints of partitions are shared
- Ability to dynamically create cluster , if none exists.
- Ability to specify the number of partitions or tokens handled by the cluster.
- Ability to detect addition and removal of client node from the cluster.
- Ability to reassign partitions (with minimum number of reassignments) on changes to cluster membership
- Ability to delay initial assignment until a quorum of client nodes join the cluster.
- Ability for checkpoints to be accessed by any node (constitutes shared state amongst client nodes)
- Ability for the client code to be notified on events such as ‘startProcessing(partition_i) ’ and ‘stopProcessing(partition_i)’ .
- Ability for client code to alter ‘server-side filter’ configuration on the fly (may not be required if a connection is created per partition )
- Ability for client node to identify and access shared state per partition.
Client library awareness of relay cluster (if one exists)
Handled by Helix ‘auto-rebalance’ mode. Dynamic creation of cluster, consistent-hashing based reassignment, can specify cluster. (Not known here is the ‘delay initial assignment’)
- The client configuration will specify the number of tokensor partitions and cluster name.
- The partition assignment is done at the level of registrations - i.e. pair of (source,filter)->consumers
- A new connection is made for each partition, this connection will read its checkpoint from a shared location.
- The server-side filtering configuration can be left empty. It is set dynamically , when reacting to partition assignment.
- The client library will use the cluster manager API .
- How is the connection started/paused? Wait until at least one partition is received. The ‘sourcesConnection’ start should be referenced by the callbacks.
- Open Question It is possible that the checkpoint will be managed by application. Otherwise, checkpoints will be have to be partition aware and shared. This means a checkpoint per partition will be written to shared storage.
in start()
if (! clusterManager.exists(clusterName) ) { clusterManager.create(clusterName,AUTO-REBALANCE,n,quorum); } //no callback called until quorum nodes have joined the cluster \\ Cluster cluster= clusterManager.get(clusterName) ; // now the clusterStateModel's callback function should be set appropriately. // The callback implements onPartitionAssignment(n) and onPartitionRemoval\(n) and // onPartitionAssignment(list of n) and onPartitionRemoval(list of n) . // In addition client should allow external callbacks to be set client.setExternalCallback(externalCallback); // In our case; the internal callback also contains a handle to reg - the registration object cluster.setOnPartitionAssignment(intCallback= new InternalCallback(externalCallback,client); //this callback can be an internal/external (client code) object cluster.setOnPartitionRemoval( intCallback); //internalCallback dynamically creates the registration /stops the registration internalCallback.onPartitionAssignment(list of partitions) { sources=_client.getSources(); foreach p of list_of_partitions { reg= _client.register(client.getSources, createFilter(p),client.getConsumers(sources)); _internalPartitionTable.add(p); //a connection object is made for each reg reg.createSourcesConnection(client.getSourceMap() ) ; externalCallback.onPartitionAssignment(p,reg,reg.getSourcesConnection()); } } internalCallback.onPartitionRemoval(list of partitions) { sources=_client.getSources(); foreach p of list_of_partitions { _internalPartitionTable.remove\(n); //a connection object is made for each reg externalCallback.onPartitionRemoval(p,reg,reg.getSourcesConnection()); } // the default external callback could implement something basic like treat n as the final serverside partition number : note that externallCallback API \\ externalCallback.setOnPartitionAssignment(n ,reg,sourcesConnection) { _internalPartitionTable.add(n); add the new partition ; sourceConnection.start();} externalCallback.setOnPartitionRemoval(n,reg,sourcesConnection) { sourceConnection.stop(); } } cluster.join(reg.getId()); //join with id of registration; is this unique across machines?
in shutdown()
+ cluster.leave(reg.getId());
in dispatcher
foreach p sourceConn.getFilter().getPartitions(); storeCheckpoint(checkpoint, p); CheckpointManager will have a ref to 'cluster' and cluster.setPropertyStore(p+regular_check+point_name, checkpoint);
end
Consumer Factories may be required to instantiate a new consumer for every connection. Rest of callback API not changed
- V2 and V3 have near identical registration semantics
- No consumer level API changes
- partition artifact of subscription at client, does not figure in event stream.
- proliferation of connections - absence of grouping of partitions
- client code aware of registration objects
- external callback API for partitionAssignments set