Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clients): add saveObjects, deleteObjects and partialUpdateObjects helpers #3180

Merged
merged 21 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,50 @@ private static int NextDelay(int retryCount)
return Math.Min(retryCount * 200, 5000);
}

/// <summary>
/// Helper: Saves the given array of objects in the given index. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
public async Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEnumerable<T> objects,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objectIDs">The list of `objectIDs` to remove from the given Algolia `indexName`.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
public async Task<List<BatchResponse>> DeleteObjects(string indexName, IEnumerable<String> objectIDs,
RequestOptions options = null,
CancellationToken cancellationToken = default)
{
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Helper: Replaces object content of all the given objects according to their respective `objectID` field. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to update in the given Algolia `indexName`.</param>
/// <param name="createIfNotExists">To be provided if non-existing objects are passed, otherwise, the call will fail.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
public async Task<List<BatchResponse>> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, 1000, options, cancellationToken).ConfigureAwait(false);
}

private static async Task<List<TU>> CreateIterable<TU>(Func<TU, Task<TU>> executeQuery,
Func<TU, bool> stopCondition)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.ktor.util.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.*
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
Expand Down Expand Up @@ -316,6 +317,80 @@ public suspend fun SearchClient.chunkedBatch(
return tasks
}

/**
* Helper: Saves the given array of objects in the given index. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
*
* @param indexName The index in which to perform the request.
* @param objects The list of objects to index.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun SearchClient.saveObjects(
indexName: String,
objects: List<JsonObject>,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = Action.AddObject,
waitForTask = false,
batchSize = 1000,
requestOptions = requestOptions,
)
}

/**
* Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
*
* @param indexName The index in which to perform the request.
* @param objectIDs The list of objectIDs to delete from the index.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun SearchClient.deleteObjects(
indexName: String,
objectIDs: List<String>,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objectIDs.map { id -> JsonObject(mapOf("objectID" to Json.encodeToJsonElement(id))) },
action = Action.DeleteObject,
waitForTask = false,
batchSize = 1000,
requestOptions = requestOptions,
)
}

/**
* Helper: Replaces object content of all the given objects according to their respective `objectID` field. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
*
* @param indexName The index in which to perform the request.
* @param objectIDs The list of objects to update in the index.
* @param createIfNotExists To be provided if non-existing objects are passed, otherwise, the call will fail..
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun SearchClient.partialUpdateObjects(
indexName: String,
objects: List<JsonObject>,
createIfNotExists: Boolean,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
return this.chunkedBatch(
indexName = indexName,
objects = objects,
action = if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate,
waitForTask = false,
batchSize = 1000,
requestOptions = requestOptions,
)
}

/**
* Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
* Replace all objects in an index without any downtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ package object extension {
*
* @param indexName
* The index in which to perform the request.
* @param records
* The list of records to replace.
* @param objects
* The list of objects to save.
* @param action
* The action to perform on the records.
* The action to perform on the objects.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
Expand All @@ -212,14 +212,14 @@ package object extension {
*/
def chunkedBatch(
indexName: String,
records: Seq[Any],
objects: Seq[Any],
action: Action = Action.AddObject,
waitForTasks: Boolean,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
var futures = Seq.empty[Future[BatchResponse]]
records.grouped(batchSize).foreach { chunk =>
objects.grouped(batchSize).foreach { chunk =>
val requests = chunk.map { record =>
BatchRequest(action = action, body = record)
}
Expand All @@ -244,6 +244,66 @@ package object extension {
responses
}

/** Helper: Saves the given array of objects in the given index. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
*
* @param indexName
* The index in which to perform the request.
* @param objects
* The list of objects to save.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the response of the batch operations.
*/
def saveObjects(
indexName: String,
objects: Seq[Any],
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(indexName, objects, Action.AddObject, false, 1000, requestOptions)
}

/** Helper: Deletes every objects for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
*
* @param indexName
* The index in which to perform the request.
* @param objectIDs
* The list of objectIDs to delete.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the response of the batch operations.
*/
def deleteObjects(
indexName: String,
objectIDs: Seq[String],
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(indexName, objectIDs.map(id => new { val objectID: String = id }), Action.DeleteObject, false, 1000, requestOptions)
}

/** Helper: Replaces object content of all the given objects according to their respective `objectID` field. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
*
* @param indexName
* The index in which to perform the request.
* @param objects
* The list of objects to save.
* @param createIfNotExists
* To be provided if non-existing objects are passed, otherwise, the call will fail.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the response of the batch operations.
*/
def partialUpdateObjects(
indexName: String,
objects: Seq[Any],
createIfNotExists: Boolean,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
chunkedBatch(indexName, objects, if (createIfNotExists) Action.PartialUpdateObject else Action.PartialUpdateObjectNoCreate, false, 1000, requestOptions)
}

/** Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
* Replace all objects in an index without any downtime. Internally, this method copies the existing index
* settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the
Expand All @@ -254,8 +314,8 @@ package object extension {
*
* @param indexName
* The index in which to perform the request.
* @param records
* The list of records to replace.
* @param objects
* The list of objects to replace.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
Expand All @@ -265,11 +325,11 @@ package object extension {
*/
def replaceAllObjects(
indexName: String,
records: Seq[Any],
objects: Seq[Any],
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val requests = records.map { record =>
val requests = objects.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"
Expand All @@ -287,7 +347,7 @@ package object extension {

batchResponses <- chunkedBatch(
indexName = tmpIndexName,
records = records,
objects = objects,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,71 @@ public extension SearchClient {
return responses
}

/// Helper: Saves the given array of objects in the given index. The `chunkedBatch` helper is used under the hood,
/// which creates a `batch` requests with at most 1000 objects in it.
/// - parameter indexName: The name of the index where to save the objects
/// - parameter objects: The new objects
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func saveObjects(
indexName: String,
objects: [some Encodable],
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: .addObject,
waitForTasks: false,
batchSize: 1000,
requestOptions: requestOptions
)
}

/// Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which
/// creates a `batch` requests with at most 1000 objectIDs in it.
/// - parameter indexName: The name of the index to delete objectIDs from
/// - parameter objectIDs: The objectIDs to delete
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func deleteObjects(
indexName: String,
objectIDs: [String],
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objectIDs.map { AnyCodable(["objectID": $0]) },
action: .deleteObject,
waitForTasks: false,
batchSize: 1000,
requestOptions: requestOptions
)
}

/// Helper: Replaces object content of all the given objects according to their respective `objectID` field. The
/// `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
/// - parameter indexName: The name of the index where to update the objects
/// - parameter objects: The objects to update
/// - parameter createIfNotExist: To be provided if non-existing objects are passed, otherwise, the call will fail..
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func partialUpdateObjects(
indexName: String,
objects: [some Encodable],
createIfNotExist: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: createIfNotExist ? .partialUpdateObject : .partialUpdateObjectNoCreate,
waitForTasks: false,
batchSize: 1000,
requestOptions: requestOptions
)
}

/// Replace all objects in an index
///
/// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation
Expand Down
35 changes: 35 additions & 0 deletions specs/search/helpers/deleteObjects.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
method:
post:
x-helper: true
tags:
- Records
operationId: deleteObjects
summary: Deletes every records for the given objectIDs
description: |
Helper: Deletes every records for the given objectIDs. The `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objectIDs in it.
parameters:
- in: query
name: indexName
description: The `indexName` to delete `objectIDs` from.
required: true
schema:
type: string
- in: query
name: objectIDs
description: The objectIDs to delete.
required: true
schema:
type: array
items:
type: string
responses:
'200':
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '../paths/objects/common/schemas.yml#/batchResponse'
'400':
$ref: '../../common/responses/IndexNotFound.yml'
Loading
Loading