Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add data stream support to tools and use-chat #4789

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions packages/ai/core/data-stream/data-stream-writer.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { JSONValue } from '@ai-sdk/provider';
import { DataStreamString } from '@ai-sdk/ui-utils';

export interface DataStreamWriter {
/**
* Appends a data part to the stream.
*/
write(data: DataStreamString): void;

/**
* Basic version of DataStreamWriter that exposes only the high-level writing methods.
* Used for client-side and tool execution where only data and annotation writing is needed.
*/
export interface BasicDataStreamWriter {
/**
* Appends a data part to the stream.
*/
Expand All @@ -16,6 +15,17 @@ export interface DataStreamWriter {
* Appends a message annotation to the stream.
*/
writeMessageAnnotation(value: JSONValue): void;
}

/**
* Full version of DataStreamWriter that includes all stream manipulation methods.
* Used internally for stream processing and merging.
*/
export interface DataStreamWriter extends BasicDataStreamWriter {
/**
* Appends a data part to the stream.
*/
write(data: DataStreamString): void;

/**
* Merges the contents of another stream to this stream.
Expand Down
15 changes: 15 additions & 0 deletions packages/ai/core/generate-text/run-tools-transformation.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { LanguageModelV1StreamPart } from '@ai-sdk/provider';
import { generateId } from '@ai-sdk/ui-utils';
import { formatStreamPart } from '@ai-sdk/ui-utils';
import { Tracer } from '@opentelemetry/api';
import { ToolExecutionError } from '../../errors';
import { CoreMessage } from '../prompt/message';
import { BasicDataStreamWriter } from '../data-stream/data-stream-writer';
import { assembleOperationName } from '../telemetry/assemble-operation-name';
import { recordSpan } from '../telemetry/record-span';
import { selectTelemetryAttributes } from '../telemetry/select-telemetry-attributes';
Expand Down Expand Up @@ -133,6 +135,18 @@ export function runToolsTransformation<TOOLS extends ToolSet>({
SingleRequestTextStreamPart<TOOLS>
>,
) {
// Initialize dataStream that writes to the forward stream controller
const dataStream: BasicDataStreamWriter = {
writeData(data: unknown) {
controller.enqueue(formatStreamPart('data', [data]));
},
writeMessageAnnotation(annotation: unknown) {
controller.enqueue(
formatStreamPart('message_annotations', [annotation]),
);
},
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this stage we do not have a data stream so this goes against the architecture of streamText. agree with the general idea though - but the integration will not be that simple (thought about it quite a bit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah; yeah I was looking at the ai chatbot over the weekend and how it handles custom data.

Realized being able to write to a custom data stream would be pretty useful for execute and onToolCall, especially for longer tool calls with an api that supports streams.

Tried to find a good DX but yeah couldn’t exactly find best way to integrate data stream into stream text.

const chunkType = chunk.type;

switch (chunkType) {
Expand Down Expand Up @@ -212,6 +226,7 @@ export function runToolsTransformation<TOOLS extends ToolSet>({
toolCallId: toolCall.toolCallId,
messages,
abortSignal,
dataStream,
}).then(
(result: any) => {
toolResultsStreamController!.enqueue({
Expand Down
6 changes: 6 additions & 0 deletions packages/ai/core/tool/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Schema } from '@ai-sdk/ui-utils';
import { z } from 'zod';
import { ToolResultContent } from '../prompt/tool-result-content';
import { CoreMessage } from '../prompt/message';
import { BasicDataStreamWriter } from '../data-stream/data-stream-writer';

type Parameters = z.ZodTypeAny | Schema<any>;

Expand All @@ -28,6 +29,11 @@ export interface ToolExecutionOptions {
* An optional abort signal that indicates that the overall operation should be aborted.
*/
abortSignal?: AbortSignal;

/**
* The data stream writer that can be used to write data to the stream during tool execution.
*/
dataStream: BasicDataStreamWriter;
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/react/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"dependencies": {
"@ai-sdk/provider-utils": "2.1.6",
"@ai-sdk/ui-utils": "1.1.10",
"ai": "4.1.26",
"swr": "^2.2.5",
"throttleit": "2.1.0"
},
Expand Down
27 changes: 26 additions & 1 deletion packages/react/src/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
UIMessage,
UseChatOptions,
} from '@ai-sdk/ui-utils';
import type { BasicDataStreamWriter } from 'ai';
import {
callChatApi,
extractMaxToolInvocationStep,
Expand All @@ -17,8 +18,9 @@ import {
prepareAttachmentsForRequest,
shouldResubmitMessages,
updateToolCallResult,
formatDataStreamPart,
} from '@ai-sdk/ui-utils';
import { useCallback, useEffect, useRef, useState } from 'react';
import { useCallback, useEffect, useRef, useState, useMemo } from 'react';
import useSWR from 'swr';
import { throttle } from './throttle';

Expand Down Expand Up @@ -88,6 +90,9 @@ export type UseChatHelpers = {
| ((data: JSONValue[] | undefined) => JSONValue[] | undefined),
) => void;

/** The data stream writer that can be used to write data during tool execution. */
dataStream: BasicDataStreamWriter;

/** The id of the chat */
id: string;
};
Expand Down Expand Up @@ -543,6 +548,25 @@ By default, it's set to 1, which means that only a single LLM call is made.
[mutate, triggerRequest],
);

// Create a data stream writer that matches the core API (minus merge/onError)
const dataStream = useMemo<BasicDataStreamWriter>(
() => ({
writeData(data: JSONValue) {
setData(currentData => [
...(currentData || []),
formatDataStreamPart('data', [data]),
]);
},
writeMessageAnnotation(annotation: JSONValue) {
setData(currentData => [
...(currentData || []),
formatDataStreamPart('message_annotations', [annotation]),
]);
},
}),
[setData],
);

return {
messages: messages ?? [],
id: chatId,
Expand All @@ -558,6 +582,7 @@ By default, it's set to 1, which means that only a single LLM call is made.
handleInputChange,
handleSubmit,
isLoading,
dataStream,
addToolResult,
};
}
Loading