-
-
Notifications
You must be signed in to change notification settings - Fork 675
Feat/round robin pool #4650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
FelixVaughan
wants to merge
5
commits into
nodejs:main
Choose a base branch
from
FelixVaughan:feat/round-robin-pool
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+689
−1
Open
Feat/round robin pool #4650
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
ca4d98b
init
FelixVaughan 636a318
cleanup
FelixVaughan 8d4b290
docs
FelixVaughan 566375b
added testing displaying rr, updated docs for caveats
FelixVaughan 8916027
Merge branch 'main' of github.com:nodejs/undici into feat/round-robin…
FelixVaughan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| # Class: RoundRobinPool | ||
|
|
||
| Extends: `undici.Dispatcher` | ||
|
|
||
| A pool of [Client](/docs/docs/api/Client.md) instances connected to the same upstream target with round-robin client selection. | ||
|
|
||
| Unlike [`Pool`](/docs/docs/api/Pool.md), which always selects the first available client, `RoundRobinPool` cycles through clients in a round-robin fashion. This ensures even distribution of requests across all connections, which is particularly useful when the upstream target is behind a load balancer that round-robins TCP connections across multiple backend servers (e.g., Kubernetes Services). | ||
|
|
||
| Requests are not guaranteed to be dispatched in order of invocation. | ||
|
|
||
| ## `new RoundRobinPool(url[, options])` | ||
|
|
||
| Arguments: | ||
|
|
||
| * **url** `URL | string` - It should only include the **protocol, hostname, and port**. | ||
| * **options** `RoundRobinPoolOptions` (optional) | ||
|
|
||
| ### Parameter: `RoundRobinPoolOptions` | ||
|
|
||
| Extends: [`ClientOptions`](/docs/docs/api/Client.md#parameter-clientoptions) | ||
|
|
||
| * **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Client(origin, opts)` | ||
| * **connections** `number | null` (optional) - Default: `null` - The number of `Client` instances to create. When set to `null`, the `RoundRobinPool` instance will create an unlimited amount of `Client` instances. | ||
| * **clientTtl** `number | null` (optional) - Default: `null` - The amount of time before a `Client` instance is removed from the `RoundRobinPool` and closed. When set to `null`, `Client` instances will not be removed or closed based on age. | ||
|
|
||
| ## Use Case | ||
|
|
||
| `RoundRobinPool` is designed for scenarios where: | ||
|
|
||
| 1. You connect to a single origin (e.g., `http://my-service.namespace.svc`) | ||
| 2. That origin is backed by a load balancer distributing TCP connections across multiple servers | ||
| 3. You want requests evenly distributed across all backend servers | ||
|
|
||
| **Example**: In Kubernetes, when using a Service DNS name with multiple Pod replicas, kube-proxy load balances TCP connections. `RoundRobinPool` ensures each connection (and thus each Pod) receives an equal share of requests. | ||
|
|
||
| ### Important: Backend Distribution Considerations | ||
|
|
||
| `RoundRobinPool` distributes **HTTP requests** evenly across **TCP connections**. Whether this translates to even backend server distribution depends on the load balancer's behavior: | ||
|
|
||
| **✓ Works when the load balancer**: | ||
| - Assigns different backends to different TCP connections from the same client | ||
| - Uses algorithms like: round-robin, random, least-connections (without client affinity) | ||
| - Example: Default Kubernetes Services without `sessionAffinity` | ||
|
|
||
| **✗ Does NOT work when**: | ||
| - Load balancer has client/source IP affinity (all connections from one IP → same backend) | ||
| - Load balancer uses source-IP-hash or sticky sessions | ||
|
|
||
| **How it works:** | ||
| 1. `RoundRobinPool` creates N TCP connections to the load balancer endpoint | ||
| 2. Load balancer assigns each TCP connection to a backend (per its algorithm) | ||
| 3. `RoundRobinPool` cycles HTTP requests across those N connections | ||
| 4. Result: Requests distributed proportionally to how the LB distributed the connections | ||
|
|
||
| If the load balancer assigns all connections to the same backend (e.g., due to session affinity), `RoundRobinPool` cannot overcome this. In such cases, consider using [`BalancedPool`](/docs/docs/api/BalancedPool.md) with direct backend addresses (e.g., individual pod IPs) instead of a load-balanced endpoint. | ||
|
|
||
| ## Instance Properties | ||
|
|
||
| ### `RoundRobinPool.closed` | ||
|
|
||
| Implements [Client.closed](/docs/docs/api/Client.md#clientclosed) | ||
|
|
||
| ### `RoundRobinPool.destroyed` | ||
|
|
||
| Implements [Client.destroyed](/docs/docs/api/Client.md#clientdestroyed) | ||
|
|
||
| ### `RoundRobinPool.stats` | ||
|
|
||
| Returns [`PoolStats`](PoolStats.md) instance for this pool. | ||
|
|
||
| ## Instance Methods | ||
|
|
||
| ### `RoundRobinPool.close([callback])` | ||
|
|
||
| Implements [`Dispatcher.close([callback])`](/docs/docs/api/Dispatcher.md#dispatcherclosecallback-promise). | ||
|
|
||
| ### `RoundRobinPool.destroy([error, callback])` | ||
|
|
||
| Implements [`Dispatcher.destroy([error, callback])`](/docs/docs/api/Dispatcher.md#dispatcherdestroyerror-callback-promise). | ||
|
|
||
| ### `RoundRobinPool.connect(options[, callback])` | ||
|
|
||
| See [`Dispatcher.connect(options[, callback])`](/docs/docs/api/Dispatcher.md#dispatcherconnectoptions-callback). | ||
|
|
||
| ### `RoundRobinPool.dispatch(options, handler)` | ||
|
|
||
| Implements [`Dispatcher.dispatch(options, handler)`](/docs/docs/api/Dispatcher.md#dispatcherdispatchoptions-handler). | ||
|
|
||
| ### `RoundRobinPool.pipeline(options, handler)` | ||
|
|
||
| See [`Dispatcher.pipeline(options, handler)`](/docs/docs/api/Dispatcher.md#dispatcherpipelineoptions-handler). | ||
|
|
||
| ### `RoundRobinPool.request(options[, callback])` | ||
|
|
||
| See [`Dispatcher.request(options [, callback])`](/docs/docs/api/Dispatcher.md#dispatcherrequestoptions-callback). | ||
|
|
||
| ### `RoundRobinPool.stream(options, factory[, callback])` | ||
|
|
||
| See [`Dispatcher.stream(options, factory[, callback])`](/docs/docs/api/Dispatcher.md#dispatcherstreamoptions-factory-callback). | ||
|
|
||
| ### `RoundRobinPool.upgrade(options[, callback])` | ||
|
|
||
| See [`Dispatcher.upgrade(options[, callback])`](/docs/docs/api/Dispatcher.md#dispatcherupgradeoptions-callback). | ||
|
|
||
| ## Instance Events | ||
|
|
||
| ### Event: `'connect'` | ||
|
|
||
| See [Dispatcher Event: `'connect'`](/docs/docs/api/Dispatcher.md#event-connect). | ||
|
|
||
| ### Event: `'disconnect'` | ||
|
|
||
| See [Dispatcher Event: `'disconnect'`](/docs/docs/api/Dispatcher.md#event-disconnect). | ||
|
|
||
| ### Event: `'drain'` | ||
|
|
||
| See [Dispatcher Event: `'drain'`](/docs/docs/api/Dispatcher.md#event-drain). | ||
|
|
||
| ## Example | ||
|
|
||
| ```javascript | ||
| import { RoundRobinPool } from 'undici' | ||
|
|
||
| const pool = new RoundRobinPool('http://my-service.default.svc.cluster.local', { | ||
| connections: 10 | ||
| }) | ||
|
|
||
| // Requests will be distributed evenly across all 10 connections | ||
| for (let i = 0; i < 100; i++) { | ||
| const { body } = await pool.request({ | ||
| path: '/api/data', | ||
| method: 'GET' | ||
| }) | ||
| console.log(await body.json()) | ||
| } | ||
|
|
||
| await pool.close() | ||
| ``` | ||
|
|
||
| ## See Also | ||
|
|
||
| - [Pool](/docs/docs/api/Pool.md) - Connection pool without round-robin | ||
| - [BalancedPool](/docs/docs/api/BalancedPool.md) - Load balancing across multiple origins | ||
| - [Issue #3648](https://github.com/nodejs/undici/issues/3648) - Original issue describing uneven distribution | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| 'use strict' | ||
|
|
||
| const { | ||
| PoolBase, | ||
| kClients, | ||
| kNeedDrain, | ||
| kAddClient, | ||
| kGetDispatcher, | ||
| kRemoveClient | ||
| } = require('./pool-base') | ||
| const Client = require('./client') | ||
| const { | ||
| InvalidArgumentError | ||
| } = require('../core/errors') | ||
| const util = require('../core/util') | ||
| const { kUrl } = require('../core/symbols') | ||
| const buildConnector = require('../core/connect') | ||
|
|
||
| const kOptions = Symbol('options') | ||
| const kConnections = Symbol('connections') | ||
| const kFactory = Symbol('factory') | ||
| const kIndex = Symbol('index') | ||
|
|
||
| function defaultFactory (origin, opts) { | ||
| return new Client(origin, opts) | ||
| } | ||
|
|
||
| class RoundRobinPool extends PoolBase { | ||
| constructor (origin, { | ||
| connections, | ||
| factory = defaultFactory, | ||
| connect, | ||
| connectTimeout, | ||
| tls, | ||
| maxCachedSessions, | ||
| socketPath, | ||
| autoSelectFamily, | ||
| autoSelectFamilyAttemptTimeout, | ||
| allowH2, | ||
| clientTtl, | ||
| ...options | ||
| } = {}) { | ||
| if (connections != null && (!Number.isFinite(connections) || connections < 0)) { | ||
| throw new InvalidArgumentError('invalid connections') | ||
| } | ||
|
|
||
| if (typeof factory !== 'function') { | ||
| throw new InvalidArgumentError('factory must be a function.') | ||
| } | ||
|
|
||
| if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') { | ||
| throw new InvalidArgumentError('connect must be a function or an object') | ||
| } | ||
|
|
||
| if (typeof connect !== 'function') { | ||
| connect = buildConnector({ | ||
| ...tls, | ||
| maxCachedSessions, | ||
| allowH2, | ||
| socketPath, | ||
| timeout: connectTimeout, | ||
| ...(typeof autoSelectFamily === 'boolean' ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined), | ||
| ...connect | ||
| }) | ||
| } | ||
|
|
||
| super() | ||
|
|
||
| this[kConnections] = connections || null | ||
| this[kUrl] = util.parseOrigin(origin) | ||
| this[kOptions] = { ...util.deepClone(options), connect, allowH2, clientTtl } | ||
| this[kOptions].interceptors = options.interceptors | ||
| ? { ...options.interceptors } | ||
| : undefined | ||
| this[kFactory] = factory | ||
| this[kIndex] = -1 | ||
|
|
||
| this.on('connect', (origin, targets) => { | ||
| if (clientTtl != null && clientTtl > 0) { | ||
| for (const target of targets) { | ||
| Object.assign(target, { ttl: Date.now() }) | ||
| } | ||
| } | ||
| }) | ||
|
|
||
| this.on('connectionError', (origin, targets, error) => { | ||
| for (const target of targets) { | ||
| const idx = this[kClients].indexOf(target) | ||
| if (idx !== -1) { | ||
| this[kClients].splice(idx, 1) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| [kGetDispatcher] () { | ||
| const clientTtlOption = this[kOptions].clientTtl | ||
| const clientsLength = this[kClients].length | ||
|
|
||
| // If we have no clients yet, create one | ||
| if (clientsLength === 0) { | ||
| const dispatcher = this[kFactory](this[kUrl], this[kOptions]) | ||
| this[kAddClient](dispatcher) | ||
| return dispatcher | ||
| } | ||
|
|
||
| // Round-robin through existing clients | ||
| let checked = 0 | ||
| while (checked < clientsLength) { | ||
| this[kIndex] = (this[kIndex] + 1) % clientsLength | ||
| const client = this[kClients][this[kIndex]] | ||
|
|
||
| // Check if client is stale (TTL expired) | ||
| if (clientTtlOption != null && clientTtlOption > 0 && client.ttl && ((Date.now() - client.ttl) > clientTtlOption)) { | ||
| this[kRemoveClient](client) | ||
| checked++ | ||
| continue | ||
| } | ||
|
|
||
| // Return client if it's not draining | ||
| if (!client[kNeedDrain]) { | ||
| return client | ||
| } | ||
|
|
||
| checked++ | ||
| } | ||
|
|
||
| // All clients are busy, create a new one if we haven't reached the limit | ||
| if (!this[kConnections] || clientsLength < this[kConnections]) { | ||
| const dispatcher = this[kFactory](this[kUrl], this[kOptions]) | ||
| this[kAddClient](dispatcher) | ||
| return dispatcher | ||
| } | ||
| } | ||
| } | ||
|
|
||
| module.exports = RoundRobinPool | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can also provide a similar approach to
BalancedPoolto account for faulty upstreams (Clients) that are misbehaving and possibly re-balance distribution accordingly to implementer optionsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plus one for this. Would be happy to revisit and see what we can do