diff --git a/src/Qdrant.Client/QdrantClient.cs b/src/Qdrant.Client/QdrantClient.cs index d36d19b..11d0ff0 100644 --- a/src/Qdrant.Client/QdrantClient.cs +++ b/src/Qdrant.Client/QdrantClient.cs @@ -15,9 +15,10 @@ namespace Qdrant.Client; /// public class QdrantClient : IDisposable { - private readonly QdrantGrpcClient _grpcClient; - private readonly bool _ownsGrpcClient; + private readonly QdrantGrpcClient[] _grpcClients; + private readonly bool _ownsGrpcClients; private bool _isDisposed; + private int _nextClientIndex; private readonly Collections.CollectionsClient _collectionsClient; private readonly Points.PointsClient _pointsClient; @@ -26,6 +27,20 @@ public class QdrantClient : IDisposable private readonly TimeSpan _grpcTimeout; private readonly ILogger _logger; + /// + /// Gets the next gRPC client from the pool in a round-robin fashion. + /// + /// The next gRPC client to use for the request. + private QdrantGrpcClient GetNextClient() + { + if (_grpcClients.Length == 1) + return _grpcClients[0]; + + // Atomically increment and wrap around the counter + var index = Interlocked.Increment(ref _nextClientIndex) - 1; + return _grpcClients[index % _grpcClients.Length]; + } + /// Instantiates a new Qdrant client. /// The host to connect to. /// The port to connect to. Defaults to 6334. @@ -33,6 +48,7 @@ public class QdrantClient : IDisposable /// The API key to use. /// The timeout for gRPC calls to Qdrant; sets the gRPC deadline for all calls. /// A logger factory through which to log messages. + /// The number of connections to maintain in the connection pool. Defaults to 3. /// /// This type provides higher-level wrappers over the low-level Qdrant gRPC API. If these wrappers aren't /// sufficient, can be used instead for low-level API access. @@ -43,16 +59,19 @@ public QdrantClient( bool https = false, string? apiKey = null, TimeSpan grpcTimeout = default, - ILoggerFactory? loggerFactory = null) - : this(new UriBuilder(https ? "https" : "http", host, port).Uri, apiKey, grpcTimeout, loggerFactory) + ILoggerFactory? loggerFactory = null, + int poolSize = 3) + : this(new UriBuilder(https ? "https" : "http", host, port).Uri, apiKey, grpcTimeout, loggerFactory, poolSize) { } + /// Instantiates a new Qdrant client. /// The address to connect to. /// The API key to use. /// The timeout for gRPC calls to Qdrant; sets the gRPC deadline for all calls. /// A logger factory through which to log messages. + /// The number of connections to maintain in the connection pool. Defaults to 3. /// /// This type provides higher-level wrappers over the low-level Qdrant gRPC API. If these wrappers aren't /// sufficient, can be used instead for low-level API access. @@ -61,11 +80,40 @@ public QdrantClient( System.Uri address, string? apiKey = null, TimeSpan grpcTimeout = default, - ILoggerFactory? loggerFactory = null) - : this(new QdrantGrpcClient(address, apiKey), ownsGrpcClient: true, grpcTimeout, loggerFactory) + ILoggerFactory? loggerFactory = null, + int poolSize = 3) { + if (poolSize <= 0) + poolSize = 3; + + // Create the connection pool + _grpcClients = new QdrantGrpcClient[poolSize]; + _ownsGrpcClients = true; + + try + { + for (var i = 0; i < poolSize; i++) + { + _grpcClients[i] = new QdrantGrpcClient(address, apiKey); + } + } + catch + { + // Dispose already created clients before rethrowing + Dispose(); + throw; + } + + // Use the first client for the service clients (they will be overridden by GetNextClient in actual calls) + var firstClient = _grpcClients[0]; + _collectionsClient = firstClient.Collections; + _pointsClient = firstClient.Points; + _snapshotsClient = firstClient.Snapshots; + _grpcTimeout = grpcTimeout; + _logger = loggerFactory?.CreateLogger("Qdrant.Client") ?? NullLogger.Instance; } + /// Instantiates a new Qdrant client. /// The low-level gRPC client to use. /// The timeout for gRPC calls to Qdrant; sets the gRPC deadline for all calls. @@ -88,8 +136,8 @@ private QdrantClient( TimeSpan grpcTimeout = default, ILoggerFactory? loggerFactory = null) { - _grpcClient = grpcClient; - _ownsGrpcClient = ownsGrpcClient; + _grpcClients = [grpcClient]; + _ownsGrpcClients = ownsGrpcClient; _collectionsClient = grpcClient.Collections; _pointsClient = grpcClient.Points; @@ -271,7 +319,7 @@ private async Task CreateCollectionAsync( try { - var response = await _collectionsClient.CreateAsync(request, + var response = await GetNextClient().Collections.CreateAsync(request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) .ConfigureAwait(false); @@ -461,7 +509,7 @@ public async Task GetCollectionInfoAsync( try { - var response = await _collectionsClient.GetAsync( + var response = await GetNextClient().Collections.GetAsync( new GetCollectionInfoRequest { CollectionName = collectionName }, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -1007,7 +1055,7 @@ public async Task UpsertAsync( try { - var response = await _pointsClient.UpsertAsync( + var response = await GetNextClient().Points.UpsertAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -1218,7 +1266,7 @@ private async Task DeleteAsync( try { - var response = await _pointsClient.DeleteAsync( + var response = await GetNextClient().Points.DeleteAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -1389,7 +1437,7 @@ public async Task> RetrieveAsync( try { - var response = await _pointsClient.GetAsync( + var response = await GetNextClient().Points.GetAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -1441,7 +1489,7 @@ public async Task UpdateVectorsAsync( try { - var response = await _pointsClient.UpdateVectorsAsync( + var response = await GetNextClient().Points.UpdateVectorsAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -1567,7 +1615,7 @@ private async Task DeleteVectorsAsync( try { - var response = await _pointsClient.DeleteVectorsAsync( + var response = await GetNextClient().Points.DeleteVectorsAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -1791,7 +1839,7 @@ private async Task SetPayloadAsync( try { - var response = await _pointsClient.SetPayloadAsync( + var response = await GetNextClient().Points.SetPayloadAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2015,7 +2063,7 @@ private async Task OverwritePayloadAsync( try { - var response = await _pointsClient.OverwritePayloadAsync( + var response = await GetNextClient().Points.OverwritePayloadAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2219,7 +2267,7 @@ private async Task DeletePayloadAsync( try { - var response = await _pointsClient.DeletePayloadAsync( + var response = await GetNextClient().Points.DeletePayloadAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2407,7 +2455,7 @@ private async Task ClearPayloadAsync( try { - var response = await _pointsClient.ClearPayloadAsync( + var response = await GetNextClient().Points.ClearPayloadAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2464,7 +2512,7 @@ public async Task CreatePayloadIndexAsync( try { - var response = await _pointsClient.CreateFieldIndexAsync( + var response = await GetNextClient().Points.CreateFieldIndexAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2511,7 +2559,7 @@ public async Task DeletePayloadIndexAsync( try { - var response = await _pointsClient.DeleteFieldIndexAsync( + var response = await GetNextClient().Points.DeleteFieldIndexAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2607,7 +2655,7 @@ public async Task> SearchAsync( try { - var response = await _pointsClient.SearchAsync( + var response = await GetNextClient().Points.SearchAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2661,7 +2709,7 @@ public async Task> SearchBatchAsync( try { - var response = await _pointsClient.SearchBatchAsync( + var response = await GetNextClient().Points.SearchBatchAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2770,7 +2818,7 @@ public async Task> SearchGroupsAsync( try { - var response = await _pointsClient.SearchGroupsAsync( + var response = await GetNextClient().Points.SearchGroupsAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -2840,7 +2888,7 @@ public async Task ScrollAsync( try { - var response = await _pointsClient.ScrollAsync( + var response = await GetNextClient().Points.ScrollAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3102,7 +3150,7 @@ public async Task> RecommendAsync( try { - var response = await _pointsClient.RecommendAsync( + var response = await GetNextClient().Points.RecommendAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3157,7 +3205,7 @@ public async Task> RecommendBatchAsync( try { - var response = await _pointsClient.RecommendBatchAsync( + var response = await GetNextClient().Points.RecommendBatchAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3437,7 +3485,7 @@ public async Task> RecommendGroupsAsync( try { - var response = await _pointsClient.RecommendGroupsAsync( + var response = await GetNextClient().Points.RecommendGroupsAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3485,7 +3533,7 @@ public async Task> UpdateBatchAsync( try { - var response = await _pointsClient.UpdateBatchAsync( + var response = await GetNextClient().Points.UpdateBatchAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3541,7 +3589,7 @@ public async Task CountAsync( try { - var response = await _pointsClient.CountAsync( + var response = await GetNextClient().Points.CountAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3655,7 +3703,7 @@ public async Task> DiscoverAsync( try { - var response = await _pointsClient.DiscoverAsync( + var response = await GetNextClient().Points.DiscoverAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3766,7 +3814,7 @@ public async Task> DiscoverAsync( try { - var response = await _pointsClient.DiscoverAsync( + var response = await GetNextClient().Points.DiscoverAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3815,7 +3863,7 @@ public async Task> DiscoverBatchAsync( try { - var response = await _pointsClient.DiscoverBatchAsync( + var response = await GetNextClient().Points.DiscoverBatchAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3880,7 +3928,7 @@ public async Task FacetAsync( try { - var response = await _pointsClient.FacetAsync( + var response = await GetNextClient().Points.FacetAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -3947,7 +3995,7 @@ public async Task SearchMatrixPairsAsync( try { - var response = await _pointsClient.SearchMatrixPairsAsync( + var response = await GetNextClient().Points.SearchMatrixPairsAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4014,7 +4062,7 @@ public async Task SearchMatrixOffsetsAsync( try { - var response = await _pointsClient.SearchMatrixOffsetsAsync( + var response = await GetNextClient().Points.SearchMatrixOffsetsAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4047,7 +4095,7 @@ public async Task CreateSnapshotAsync(string collectionName try { - var response = await _snapshotsClient.CreateAsync( + var response = await GetNextClient().Snapshots.CreateAsync( new CreateSnapshotRequest { CollectionName = collectionName }, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4147,7 +4195,7 @@ public async Task> QueryAsync( try { - var response = await _pointsClient.QueryAsync( + var response = await GetNextClient().Points.QueryAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4200,7 +4248,7 @@ public async Task> QueryBatchAsync( try { - var response = await _pointsClient.QueryBatchAsync( + var response = await GetNextClient().Points.QueryBatchAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4307,7 +4355,7 @@ public async Task> QueryGroupsAsync( try { - var response = await _pointsClient.QueryGroupsAsync( + var response = await GetNextClient().Points.QueryGroupsAsync( request, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4336,7 +4384,7 @@ public async Task> ListSnapshotsAsync(string try { - var response = await _snapshotsClient.ListAsync( + var response = await GetNextClient().Snapshots.ListAsync( new ListSnapshotsRequest { CollectionName = collectionName }, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4366,7 +4414,7 @@ public async Task DeleteSnapshotAsync(string collectionName, string snapshotName try { - await _snapshotsClient.DeleteAsync( + await GetNextClient().Snapshots.DeleteAsync( new DeleteSnapshotRequest { CollectionName = collectionName, SnapshotName = snapshotName }, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4392,7 +4440,7 @@ public async Task CreateFullSnapshotAsync(CancellationToken try { - var response = await _snapshotsClient.CreateFullAsync( + var response = await GetNextClient().Snapshots.CreateFullAsync( new CreateFullSnapshotRequest(), deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4420,7 +4468,7 @@ public async Task> ListFullSnapshotsAsync(Can try { - var response = await _snapshotsClient.ListFullAsync( + var response = await GetNextClient().Snapshots.ListFullAsync( new ListFullSnapshotsRequest(), deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4449,7 +4497,7 @@ public async Task DeleteFullSnapshotAsync(string snapshotName, CancellationToken try { - await _snapshotsClient.DeleteFullAsync( + await GetNextClient().Snapshots.DeleteFullAsync( new DeleteFullSnapshotRequest { SnapshotName = snapshotName }, deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken) @@ -4583,7 +4631,7 @@ public async Task DeleteShardKeyAsync( /// /// occurs when server is unavailable public async Task HealthAsync(CancellationToken cancellationToken = default) => - await _grpcClient.Qdrant.HealthCheckAsync( + await GetNextClient().Qdrant.HealthCheckAsync( new HealthCheckRequest(), deadline: _grpcTimeout == default ? null : DateTime.UtcNow.Add(_grpcTimeout), cancellationToken: cancellationToken).ConfigureAwait(false); @@ -4646,8 +4694,13 @@ protected virtual void Dispose(bool disposing) { _isDisposed = true; - if (disposing && _ownsGrpcClient) - _grpcClient.Dispose(); + if (disposing && _ownsGrpcClients) + { + foreach (var client in _grpcClients) + { + client?.Dispose(); + } + } } } }