-
Notifications
You must be signed in to change notification settings - Fork 736
Databus 2.0 Server side Filtering
This wiki captures the server side filtering design.
- For services, that use range based partitioning, the Key space is divided into ‘n’ chunks partitions. Each consumer is configured with the startId and count they will be interested to listen
- For services that use mod based partitioning, there will be predefined set of instances (Number of Buckets). Each such instance will have the bucket number that it is interested in. The Key space will be divided into these buckets (by mod operation).
In this approach,
- For Range Partition, Consumers specify the ranges of primary keys (KeyMin and KeyMax) that they are interested in. Filtering is done directly on the primary Keys on the relay and bootstrap Server.
- For Mod Based Partition, Consumers specify the number of buckets and bucketIds they are interested in. The mod operation is performed on each event’s primary Key in the required WindowSCNs and events falling on requested buckets streamed.
- Flexible: Different Services will be able to apply their own partitioning scheme/ranges. There is no one global partitioning scheme used per source here.
- Partitioning work is repeated for each consumer. Numeric Keys will have lesser overhead when compared to String keys here.
- Monitoring missed partitions is tricky here as we need to correlate the number of events seen by the relay and that all of consumers and also make sure no two consumers are listening to duplicate ranges/buckets.
This design is chosen because of its flexibility with minor variations described below:
The following parameters will be added to consumer requests to Relay and Bootstrap Server:
This will be set when Filter type is range
This will be set when Filter type is range. This will be the list of Partitions that the client requires to be streamed. The format can be of 3 types
- List : Example : \[1, 5, 10\]. In this case, the client requests non-contiguous partitions 1, 5 and 10 to be streamed.
- Range : Example : \[1-10\]. In this case, the client requests all the partitions in the range 1-10.
- Combo : Example \[1, 2, 5-10\]
This parameter is used when filterType = mod. This specifies the total number of buckets for partitioning.
This will be set when Filter type is mod. This will be the list of bucketIds that the client requires to be streamed. The format can be of 3 types
- List : Example : \[1, 5, 10\]. In this case, the client requests non-contiguous bucketIds 1, 5 and 10 to be streamed.
- Range : Example : \[1-10\]. In this case, the client requests all the bucketIds in the range 1-10.
- Combo : Example \[1, 2, 5-10\]
Since the partitioning is on the primary key, we can leverage on the partially implemented KeyMin/KeyMax filtering implementation. The reason why it is called partial is because filtering on the Bootstrap Serving Side still needs to be implemented.
- The Consumer specifies the following parameters in its request to relay/BootstrapServer.
- FilterType = range
- Interval Size = 1 M ( for example
- PartitionIds for the ranges it is interested in. For example, if the consumer is interested in ranges \[0-1M\], \[2-3M\] and \[3-4M\], then the consumer will pass the partitionIds (0,2,3) to the relay/BootstrapServer as part of its request.
- The Stream Handler in Relay will be converting the requested partitions to ranges and filter events to be streamed based on those ranges.
The reason why client is communicating partitionIds instead of ranges to relay is one of optimization (future use) and uniform interface. This design enables us to filter based on primary keys and precomputed partitionIds in the DbusEvent seamlessly. For example, we could precompute partitions for certain sources in the relay and store them in the Databus events. If the partitioning Function (filter type and interval range) of the consumer matches that used by relay, then filtering can be done directly on the partitionId of the DbusEvents. For others, filtering can be done on the primary Key. Allowing consumers to specify partitionIds instead of ranges for filtering makes it easy to use the same interface to do filtering based on the actual partitionId field of DbusEvents (future use case).
- The Consumer specifies the following parameters in its request to relay/BootstrapServer.
- FilterType = mod
- Number of buckets and
- BucketIds
The Stream Handler in Relay and Bootstrap Server will be applying the mod operations on the primary Key and streaming the events falling in requested buckets.
There will be per source configurations for Server Side Filtering. The following are the parameters:
Property name | Type | Description | Validation | Default value | Example |
serversidefilter.filter(source).type | String (RANGE, MOD, NONE) | The filter type for this source | Must be one of the enum values mentioned | NONE | RANGE |
serversidefilter.filter(source).range.size | Long | The Size of each range. Applicable only for Range Partitioning | Must be positive | 0 | 5000000 |
serversidefilter.filter(source).range.partitions | String | The list of partitionIds to filte.Applicable only for Range Partitioning | Within a range “a-b” , a must be less than or equal to 0. Also, Both a and b must not be negative The ranges used here follow the pattern [a-b) | NONE | [1,3-6] (With the size set to 5M, the ranges 5M to(10M - 1) and 15M to (30M - 1) will be filtered) |
serversidefilter.filter(source).mod.numBuckets | Long | The number of buckets. Applicable only for MOD partitioning | Must be positive | NONE | 2 |
serversidefilter.filter(source).mod.buckets | String | The list of bucketIds to filter. Applicable only for MOD partitioning | For bucket range “a-b”, both a and b must be less than numBuckets, “a” must be less than or equal to “b”. Also, Both a and b must not be negative.The ranges used here follow the pattern [a-b) | NONE | [0] (With numBuckets set to 2, all the even number keys will be filtered) |