Skip to content

Commit

Permalink
🐛 fix: improve aysnc error type (#3638)
Browse files Browse the repository at this point in the history
* 🐛 fix: improve chunking task error

* 🐛 fix: improve chunking task error

* 🐛 fix: improve embedding task error
  • Loading branch information
arvinxx authored Aug 27, 2024
1 parent c31fc2e commit dbae456
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 57 deletions.
8 changes: 4 additions & 4 deletions src/database/server/models/asyncTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ export class AsyncTaskModel {
await serverDB
.update(asyncTasks)
.set({
error: {
body: { detail: 'chunking task is timeout, please try again' },
name: AsyncTaskErrorType.Timeout,
} as AsyncTaskError,
error: new AsyncTaskError(
AsyncTaskErrorType.Timeout,
'chunking task is timeout, please try again',
),
status: AsyncTaskStatus.Error,
})
.where(
Expand Down
2 changes: 2 additions & 0 deletions src/server/routers/async/caller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ export const createAsyncServerClient = async (userId: string, payload: JWTPayloa
Authorization: `Bearer ${serverDBEnv.KEY_VAULTS_SECRET}`,
[LOBE_CHAT_AUTH_HEADER]: await gateKeeper.encrypt(JSON.stringify({ payload, userId })),
};

if (process.env.VERCEL_AUTOMATION_BYPASS_SECRET) {
headers['x-vercel-protection-bypass'] = process.env.VERCEL_AUTOMATION_BYPASS_SECRET;
}

return createTRPCClient<AsyncRouter>({
links: [
httpBatchLink({
Expand Down
118 changes: 72 additions & 46 deletions src/server/routers/async/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ import { ModelProvider } from '@/libs/agent-runtime';
import { asyncAuthedProcedure, asyncRouter as router } from '@/libs/trpc/async';
import { S3 } from '@/server/modules/S3';
import { ChunkService } from '@/server/services/chunk';
import { AsyncTaskError, AsyncTaskErrorType, AsyncTaskStatus } from '@/types/asyncTask';
import {
AsyncTaskError,
AsyncTaskErrorType,
AsyncTaskStatus,
IAsyncTaskError,
} from '@/types/asyncTask';
import { safeParseJSON } from '@/utils/safeParseJSON';

const fileProcedure = asyncAuthedProcedure.use(async (opts) => {
Expand Down Expand Up @@ -55,10 +60,12 @@ export const fileRouter = router({
try {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject({
body: { detail: 'embedding task is timeout, please try again' },
name: AsyncTaskErrorType.Timeout,
} as AsyncTaskError);
reject(
new AsyncTaskError(
AsyncTaskErrorType.Timeout,
'embedding task is timeout, please try again',
),
);
}, ASYNC_TASK_TIMEOUT);
});

Expand All @@ -76,40 +83,47 @@ export const fileRouter = router({
const chunks = await ctx.chunkModel.getChunksTextByFileId(input.fileId);
const requestArray = chunk(chunks, CHUNK_SIZE);

await pMap(
requestArray,
async (chunks, index) => {
const agentRuntime = await initAgentRuntimeWithUserPayload(
ModelProvider.OpenAI,
ctx.jwtPayload,
);

const number = index + 1;
console.log(`执行第 ${number} 个任务`);

console.time(`任务[${number}]: embeddings`);

const embeddings = await agentRuntime.embeddings({
dimensions: 1024,
input: chunks.map((c) => c.text),
model: input.model,
});
console.timeEnd(`任务[${number}]: embeddings`);

const items: NewEmbeddingsItem[] =
embeddings?.map((e) => ({
chunkId: chunks[e.index].id,
embeddings: e.embedding,
fileId: input.fileId,
model: input.model,
})) || [];
try {
await pMap(
requestArray,
async (chunks, index) => {
const agentRuntime = await initAgentRuntimeWithUserPayload(
ModelProvider.OpenAI,
ctx.jwtPayload,
);

console.time(`任务[${number}]: insert db`);
await ctx.embeddingModel.bulkCreate(items);
console.timeEnd(`任务[${number}]: insert db`);
},
{ concurrency: CONCURRENCY },
);
const number = index + 1;
console.log(`执行第 ${number} 个任务`);

console.time(`任务[${number}]: embeddings`);

const embeddings = await agentRuntime.embeddings({
dimensions: 1024,
input: chunks.map((c) => c.text),
model: input.model,
});
console.timeEnd(`任务[${number}]: embeddings`);

const items: NewEmbeddingsItem[] =
embeddings?.map((e) => ({
chunkId: chunks[e.index].id,
embeddings: e.embedding,
fileId: input.fileId,
model: input.model,
})) || [];

console.time(`任务[${number}]: insert db`);
await ctx.embeddingModel.bulkCreate(items);
console.timeEnd(`任务[${number}]: insert db`);
},
{ concurrency: CONCURRENCY },
);
} catch (e) {
throw {
message: JSON.stringify(e),
name: AsyncTaskErrorType.EmbeddingError,
};
}

const duration = Date.now() - startAt;
// update the task status to success
Expand All @@ -125,8 +139,9 @@ export const fileRouter = router({
return await Promise.race([embeddingPromise(), timeoutPromise]);
} catch (e) {
console.error('embeddingChunks error', e);

await ctx.asyncTaskModel.update(input.taskId, {
error: e,
error: new AsyncTaskError((e as Error).name, (e as Error).message),
status: AsyncTaskStatus.Error,
});

Expand Down Expand Up @@ -175,10 +190,12 @@ export const fileRouter = router({

const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject({
body: { detail: 'chunking task is timeout, please try again' },
name: AsyncTaskErrorType.Timeout,
} as AsyncTaskError);
reject(
new AsyncTaskError(
AsyncTaskErrorType.Timeout,
'chunking task is timeout, please try again',
),
);
}, ASYNC_TASK_TIMEOUT);
});

Expand All @@ -201,6 +218,15 @@ export const fileRouter = router({

const duration = Date.now() - startAt;

// if no chunk found, throw error
if (chunks.length === 0) {
throw {
message:
'No chunk found in this file. it may due to current chunking method can not parse file accurately',
name: AsyncTaskErrorType.NoChunkError,
};
}

await ctx.chunkModel.bulkCreate(chunks, input.fileId);

if (chunkResult.unstructuredChunks) {
Expand Down Expand Up @@ -228,9 +254,9 @@ export const fileRouter = router({
} catch (e) {
const error = e as any;

const asyncTaskError: AsyncTaskError = error.body
? { body: safeParseJSON(error.body) ?? error.body, name: error.name }
: { body: { detail: error.message }, name: (error as Error).name };
const asyncTaskError = error.body
? ({ body: safeParseJSON(error.body) ?? error.body, name: error.name } as IAsyncTaskError)
: new AsyncTaskError((error as Error).name, error.message);

console.error('[Chunking Error]', asyncTaskError);
await ctx.asyncTaskModel.update(input.taskId, {
Expand Down
35 changes: 32 additions & 3 deletions src/server/services/chunk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import { AsyncTaskModel } from '@/database/server/models/asyncTask';
import { FileModel } from '@/database/server/models/file';
import { ChunkContentParams, ContentChunk } from '@/server/modules/ContentChunk';
import { createAsyncServerClient } from '@/server/routers/async';
import { AsyncTaskStatus, AsyncTaskType } from '@/types/asyncTask';
import {
AsyncTaskError,
AsyncTaskErrorType,
AsyncTaskStatus,
AsyncTaskType,
} from '@/types/asyncTask';

export class ChunkService {
private userId: string;
Expand Down Expand Up @@ -40,7 +45,19 @@ export class ChunkService {
const asyncCaller = await createAsyncServerClient(this.userId, payload);

// trigger embedding task asynchronously
await asyncCaller.file.embeddingChunks.mutate({ fileId, taskId: asyncTaskId });
try {
await asyncCaller.file.embeddingChunks.mutate({ fileId, taskId: asyncTaskId });
} catch (e) {
console.error('[embeddingFileChunks] error:', e);

await this.asyncTaskModel.update(asyncTaskId, {
error: new AsyncTaskError(
AsyncTaskErrorType.TaskTriggerError,
'trigger chunk embedding async task error. Please check your app is public available or check your proxy settings is set correctly.',
),
status: AsyncTaskStatus.Error,
});
}

return asyncTaskId;
}
Expand All @@ -67,7 +84,19 @@ export class ChunkService {
const asyncCaller = await createAsyncServerClient(this.userId, payload);

// trigger parse file task asynchronously
asyncCaller.file.parseFileToChunks.mutate({ fileId: fileId, taskId: asyncTaskId });
asyncCaller.file.parseFileToChunks
.mutate({ fileId: fileId, taskId: asyncTaskId })
.catch(async (e) => {
console.error('[ParseFileToChunks] error:', e);

await this.asyncTaskModel.update(asyncTaskId, {
error: new AsyncTaskError(
AsyncTaskErrorType.TaskTriggerError,
'trigger file parse async task error. Please check your app is public available or check your proxy settings is set correctly.',
),
status: AsyncTaskStatus.Error,
});
});

return asyncTaskId;
}
Expand Down
27 changes: 23 additions & 4 deletions src/types/asyncTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,40 @@ export enum AsyncTaskStatus {
}

export enum AsyncTaskErrorType {
SDKError = 'SDKError',
EmbeddingError = 'EmbeddingError',
/**
* the chunk parse result it empty
*/
NoChunkError = 'NoChunkError',
ServerError = 'ServerError',
/**
* this happens when the task is not trigger successfully
*/
TaskTriggerError = 'TaskTriggerError',
Timeout = 'TaskTimeout',
}

export interface AsyncTaskError {
export interface IAsyncTaskError {
body: string | { detail: string };
name: string;
}

export class AsyncTaskError implements IAsyncTaskError {
constructor(name: string, message: string) {
this.name = name;
this.body = { detail: message };
}

name: string;

body: { detail: string };
}

export interface FileParsingTask {
chunkCount?: number | null;
chunkingError?: AsyncTaskError | null;
chunkingError?: IAsyncTaskError | null;
chunkingStatus?: AsyncTaskStatus | null;
embeddingError?: AsyncTaskError | null;
embeddingError?: IAsyncTaskError | null;
embeddingStatus?: AsyncTaskStatus | null;
finishEmbedding?: boolean;
}

0 comments on commit dbae456

Please sign in to comment.