Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
MEILI_NO_ANALYTICS: 'true'
ports:
- '7700:7700'
options: "--add-host host.docker.internal=host-gateway"
strategy:
fail-fast: false
matrix:
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from "./types/index.js";
export * from "./errors/index.js";
export * from "./indexes.js";
export * from "./task/webhook-task.js";
import { MeiliSearch } from "./meilisearch.js";

/**
Expand Down
14 changes: 11 additions & 3 deletions src/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import {
getHttpRequestsWithEnqueuedTaskPromise,
TaskClient,
type HttpRequestsWithEnqueuedTaskPromise,
} from "./task.js";
} from "./task/task.js";

export class Index<T extends RecordAny = RecordAny> {
uid: string;
Expand All @@ -78,7 +78,11 @@ export class Index<T extends RecordAny = RecordAny> {
this.uid = uid;
this.primaryKey = primaryKey;
this.httpRequest = new HttpRequests(config);
this.tasks = new TaskClient(this.httpRequest, config.defaultWaitOptions);
this.tasks = new TaskClient(
this.httpRequest,
config.webhookTaskClient,
config.defaultWaitOptions,
);
this.#httpRequestsWithTask = getHttpRequestsWithEnqueuedTaskPromise(
this.httpRequest,
this.tasks,
Expand Down Expand Up @@ -244,7 +248,11 @@ export class Index<T extends RecordAny = RecordAny> {
const httpRequests = new HttpRequests(config);
return getHttpRequestsWithEnqueuedTaskPromise(
httpRequests,
new TaskClient(httpRequests),
new TaskClient(
httpRequests,
config.webhookTaskClient,
config.defaultWaitOptions,
),
).post({
path: "indexes",
body: { ...options, uid },
Expand Down
15 changes: 11 additions & 4 deletions src/meilisearch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import {
getHttpRequestsWithEnqueuedTaskPromise,
TaskClient,
type HttpRequestsWithEnqueuedTaskPromise,
} from "./task.js";
import { BatchClient } from "./batch.js";
} from "./task/task.js";
import { BatchClient } from "./task/batch.js";
import { ChatWorkspace } from "./chat-workspace.js";
import type { MeiliSearchApiError } from "./errors/index.js";

Expand Down Expand Up @@ -73,6 +73,7 @@ export class MeiliSearch {

this.#taskClient = new TaskClient(
this.httpRequest,
config.webhookTaskClient,
config.defaultWaitOptions,
);
this.#batchClient = new BatchClient(this.httpRequest);
Expand Down Expand Up @@ -473,8 +474,14 @@ export class MeiliSearch {
*
* @returns Promise returning an object with health details
*/
async health(): Promise<Health> {
return await this.httpRequest.get<Health>({ path: "health" });
async health(
// TODO: Need to do this for all other methods: https://github.com/meilisearch/meilisearch-js/issues/1476
extraRequestInit?: ExtraRequestInit,
): Promise<Health> {
return await this.httpRequest.get<Health>({
path: "health",
extraRequestInit,
});
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/batch.ts → src/task/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import type {
Batch,
BatchesResults,
TasksOrBatchesQuery,
} from "./types/index.js";
import type { HttpRequests } from "./http-requests.js";
} from "../types/index.js";
import type { HttpRequests } from "../http-requests.js";

/**
* Class for handling batches.
Expand Down
81 changes: 51 additions & 30 deletions src/task.ts → src/task/task.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { MeiliSearchTaskTimeOutError } from "./errors/index.js";
import { MeiliSearchTaskTimeOutError } from "../errors/index.js";
import type { WebhookTaskClient } from "./webhook-task.js";
import type {
WaitOptions,
TasksOrBatchesQuery,
Expand All @@ -9,14 +10,14 @@ import type {
EnqueuedTaskPromise,
TaskUidOrEnqueuedTask,
ExtraRequestInit,
} from "./types/index.js";
import type { HttpRequests } from "./http-requests.js";
} from "../types/index.js";
import type { HttpRequests } from "../http-requests.js";

/**
* Used to identify whether an error is a timeout error in
* {@link TaskClient.waitForTask}.
*/
const TIMEOUT_ID = Symbol("<task timeout>");
export const TIMEOUT_ID = Symbol("<task timeout>");

/**
* @returns A function which defines an extra function property on a
Expand Down Expand Up @@ -58,31 +59,11 @@ export class TaskClient {
readonly #httpRequest: HttpRequests;
readonly #defaultTimeout: number;
readonly #defaultInterval: number;
readonly #applyWaitTask: ReturnType<typeof getWaitTaskApplier>;

constructor(httpRequest: HttpRequests, defaultWaitOptions?: WaitOptions) {
this.#httpRequest = httpRequest;
this.#defaultTimeout = defaultWaitOptions?.timeout ?? 5_000;
this.#defaultInterval = defaultWaitOptions?.interval ?? 50;
this.#applyWaitTask = getWaitTaskApplier(this);
}

/** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-one-task} */
async getTask(
uid: number,
// TODO: Need to do this for all other methods: https://github.com/meilisearch/meilisearch-js/issues/1476
extraRequestInit?: ExtraRequestInit,
): Promise<Task> {
return await this.#httpRequest.get({
path: `tasks/${uid}`,
extraRequestInit,
});
}

/** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-tasks} */
async getTasks(params?: TasksOrBatchesQuery): Promise<TasksResults> {
return await this.#httpRequest.get({ path: "tasks", params });
}
readonly #applyWaitTask = getWaitTaskApplier(this);
readonly waitForTask: (
taskUidOrEnqueuedTask: TaskUidOrEnqueuedTask,
options?: WaitOptions,
) => Promise<Task>;

/**
* Wait for an enqueued task to be processed. This is done through polling
Expand All @@ -93,7 +74,7 @@ export class TaskClient {
* to instead use {@link EnqueuedTaskPromise.waitTask}, which is available on
* any method that returns an {@link EnqueuedTaskPromise}.
*/
async waitForTask(
async #waitForTask(
taskUidOrEnqueuedTask: TaskUidOrEnqueuedTask,
options?: WaitOptions,
): Promise<Task> {
Expand Down Expand Up @@ -128,6 +109,46 @@ export class TaskClient {
}
}

constructor(
httpRequest: HttpRequests,
webhookTaskClient?: WebhookTaskClient,
options?: WaitOptions,
) {
this.#httpRequest = httpRequest;

// TODO: Timeout error is only caught for private method
this.waitForTask =
webhookTaskClient !== undefined
? (taskUidOrEnqueuedTask, options) => {
const taskUid = getTaskUid(taskUidOrEnqueuedTask);
return webhookTaskClient.waitForTask(
taskUid,
options?.timeout ?? this.#defaultTimeout,
);
}
: this.#waitForTask.bind(this);

this.#defaultTimeout = options?.timeout ?? 5_000;
this.#defaultInterval = options?.interval ?? 50;
}

/** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-one-task} */
async getTask(
uid: number,
// TODO: Need to do this for all other methods: https://github.com/meilisearch/meilisearch-js/issues/1476
extraRequestInit?: ExtraRequestInit,
): Promise<Task> {
return await this.#httpRequest.get({
path: `tasks/${uid}`,
extraRequestInit,
});
}

/** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-tasks} */
async getTasks(params?: TasksOrBatchesQuery): Promise<TasksResults> {
return await this.#httpRequest.get({ path: "tasks", params });
}

/**
* Lazily wait for multiple enqueued tasks to be processed.
*
Expand Down
97 changes: 97 additions & 0 deletions src/task/webhook-task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import type {
EnqueuedTask,
Task,
WebhookTaskClientOptions,
} from "../types/index.js";
import { TIMEOUT_ID } from "./task.js";

type TimeoutID = ReturnType<typeof setTimeout>;
type TaskUid = EnqueuedTask["taskUid"];

function* parseNDJSONTasks(tasksString: string) {
if (tasksString === "") {
return;
}

let newLineIndex: number | undefined = undefined;
for (;;) {
const lastIndexPlusOneOrZero =
newLineIndex === undefined ? 0 : newLineIndex + 1;
newLineIndex = tasksString.indexOf("\n", lastIndexPlusOneOrZero);

if (newLineIndex === -1) {
newLineIndex = undefined;
}

yield JSON.parse(
tasksString.substring(lastIndexPlusOneOrZero, newLineIndex),
) as Task;

if (newLineIndex === undefined) {
return;
}
}
}

export class WebhookTaskClient {
readonly #taskMap = new Map<TaskUid, (task: Task) => void>();
readonly #orphanTaskMap = new Map<
TaskUid,
{ task: Task; timeoutId?: TimeoutID }
>();
readonly #timeout: number;
readonly #timeoutCallback: (task: Task) => void;

constructor(options?: WebhookTaskClientOptions) {
this.#timeout = options?.timeout ?? 30_000;
this.#timeoutCallback =
options?.timeoutCallback ??
((task) => console.error("unclaimed orphan task", task));
}

pushTasksString(tasksString: string): void {
for (const task of parseNDJSONTasks(tasksString)) {
const callback = this.#taskMap.get(task.uid);

if (callback !== undefined) {
this.#taskMap.delete(task.uid);
callback(task);

return;
}

const timeoutId = setTimeout(() => {
this.#orphanTaskMap.delete(task.uid);
this.#timeoutCallback(task);
}, this.#timeout);

this.#orphanTaskMap.set(task.uid, { task, timeoutId });
}
}

async waitForTask(taskUid: TaskUid, timeout?: number): Promise<Task> {
const orphan = this.#orphanTaskMap.get(taskUid);

if (orphan !== undefined) {
clearTimeout(orphan.timeoutId);
return orphan.task;
}

let to: TimeoutID | undefined = undefined;

const task = await new Promise<Task>((resolve, reject) => {
this.#taskMap.set(taskUid, resolve);
to = setTimeout(() => {
// TODO: This should be the same as in TaskClient
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
reject(TIMEOUT_ID);
}, timeout);
});

clearTimeout(to);

return task;
}

// TODO: destroy method -> for every orphaned task call error method and clear timeouts
}
2 changes: 2 additions & 0 deletions src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// Definitions: https://github.com/meilisearch/meilisearch-js
// TypeScript Version: ^5.8.2

import type { WebhookTaskClient } from "../task/webhook-task.js";
import type { WaitOptions } from "./task_and_batch.js";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -78,6 +79,7 @@ export type Config = {
timeout?: number;
/** Customizable default options for awaiting tasks. */
defaultWaitOptions?: WaitOptions;
webhookTaskClient?: WebhookTaskClient;
};

/** Main options of a request. */
Expand Down
7 changes: 7 additions & 0 deletions src/types/webhooks.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Task } from "./task_and_batch.js";

export type Webhook = {
/** A v4 uuid Meilisearch automatically generates when you create a new webhook */
uuid: string;
Expand Down Expand Up @@ -25,3 +27,8 @@ export type WebhookUpdatePayload = {
/** An object with HTTP headers and their values */
headers?: Record<string, string>;
};

export type WebhookTaskClientOptions = {
timeout?: number;
timeoutCallback?: (task: Task) => void;
};
Loading
Loading