Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions DATA_STREAMING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Live Data Streaming

The live data streaming feature allows visualization of real-time text updates across multiple streams. This is useful for monitoring ongoing processes or displaying live transcription or streaming data.

## API

The `/api/update-data-stream` endpoint provides functionality for managing live text streams and finalized data entries:

- **POST**: Submit live text updates or finalized entries
- `text`: The text content to stream
- `stream_id`: Identifier for the data stream (defaults to 'default')
- `timestamp`: Unix timestamp (defaults to current time)
- `finalized`: Boolean flag to mark entry as finalized
- `uuid`: Backend UUID for database tracking

- **GET**: Retrieve live or finalized data
- Query `?type=finalized` for processed entries
- Query `?stream=<stream_id>` for specific stream data
- No query parameters returns all live streams

- **PATCH**: Update entry processing status using UUID

## Data Stream Display

The chat interface includes a "Data Stream Display" toggle in the header menu that enables real-time visualization of streaming data alongside chat conversations. This feature is particularly useful for monitoring live transcription feeds or processing status updates.

## Database Watcher

Database entries are created when data is submitted to the `/api/update-data-stream` endpoint with the `finalized` field set to `true`. These entries represent completed or processed data that should be persisted to a database system.

NOTE: This API just provides visualization and does not manage a database itself. The assumption is that the user is using this API when manually updating a database.
17 changes: 17 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ RUN npm i
FROM base AS builder
WORKDIR /app

# Accept build arguments for NEXT_PUBLIC_ environment variables
ARG NEXT_PUBLIC_HTTP_CHAT_COMPLETION_URL
ARG NEXT_PUBLIC_WEBSOCKET_CHAT_COMPLETION_URL
ARG NEXT_PUBLIC_SHOW_DATA_STREAM_DEFAULT_ON
ARG NEXT_PUBLIC_WORKFLOW
ARG NEXT_PUBLIC_CHAT_HISTORY_DEFAULT_ON
ARG NEXT_PUBLIC_WEB_SOCKET_DEFAULT_ON
ARG NEXT_PUBLIC_ENABLE_INTERMEDIATE_STEPS

# Set them as environment variables for the build
ENV NEXT_PUBLIC_HTTP_CHAT_COMPLETION_URL=$NEXT_PUBLIC_HTTP_CHAT_COMPLETION_URL
ENV NEXT_PUBLIC_WEBSOCKET_CHAT_COMPLETION_URL=$NEXT_PUBLIC_WEBSOCKET_CHAT_COMPLETION_URL
ENV NEXT_PUBLIC_SHOW_DATA_STREAM_DEFAULT_ON=$NEXT_PUBLIC_SHOW_DATA_STREAM_DEFAULT_ON
ENV NEXT_PUBLIC_WORKFLOW=$NEXT_PUBLIC_WORKFLOW
ENV NEXT_PUBLIC_CHAT_HISTORY_DEFAULT_ON=$NEXT_PUBLIC_CHAT_HISTORY_DEFAULT_ON
ENV NEXT_PUBLIC_WEB_SOCKET_DEFAULT_ON=$NEXT_PUBLIC_WEB_SOCKET_DEFAULT_ON
ENV NEXT_PUBLIC_ENABLE_INTERMEDIATE_STEPS=$NEXT_PUBLIC_ENABLE_INTERMEDIATE_STEPS

COPY --from=deps /app/node_modules ./node_modules
COPY . .
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,16 @@ NOTE: Most of the time, you will want to select /chat/stream for intermediate re
- /generate/stream - Streaming response generation
- /chat - Single response chat completion
- /chat/stream - Streaming chat completion
- /chat/ca-rag - Single response chat completion when using [Context-Aware RAG](https://github.com/NVIDIA/context-aware-rag) backend
- `WebSocket URL for Completion`: WebSocket URL to connect to running NeMo Agent Toolkit server
- `WebSocket Schema`: Workflow schema type over WebSocket connection

### Live Data Streaming

The live data streaming feature allows visualization of real-time text updates across multiple streams. This is useful for monitoring ongoing processes or displaying live transcription or streaming data.

For more detail, see the [README for live data streaming](DATA_STREAMING.md).

## Usage Examples

### Getting Started Example
Expand Down
2 changes: 1 addition & 1 deletion components/Avatar/SystemAvatar.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IconPasswordUser, IconUserPentagon } from '@tabler/icons-react';
import { IconPasswordUser } from '@tabler/icons-react';
import React from 'react';

export const SystemAgentAvatar = ({ height = 7, width = 7 }) => {
Expand Down
2 changes: 0 additions & 2 deletions components/Avatar/UserAvatar.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import React from 'react';

import { getInitials } from '@/utils/app/helper';

export const UserAvatar = ({ src = '', height = 30, width = 30 }) => {
const profilePicUrl = src || ``;

Expand Down
49 changes: 19 additions & 30 deletions components/Chat/Chat.tsx
Original file line number Diff line number Diff line change
@@ -1,39 +1,32 @@

'use client';

import { ChatHeader } from './ChatHeader';
import { ChatInput } from './ChatInput';
import { ChatLoader } from './ChatLoader';
import { MemoizedChatMessage } from './MemoizedChatMessage';
import { v4 as uuidv4 } from 'uuid';
import toast from 'react-hot-toast';
import { useCallback, useContext, useEffect, useRef, useState } from 'react';

import { InteractionModal } from '@/components/Chat/ChatInteractionMessage';
import HomeContext from '@/pages/api/home/home.context';
import { ChatBody, Conversation, Message } from '@/types/chat';
import {
WebSocketInbound,
validateWebSocketMessage,
validateWebSocketMessageWithConversationId,
validateConversationId,
isSystemResponseMessage,
isSystemIntermediateMessage,
isSystemInteractionMessage,
isErrorMessage,
isSystemResponseInProgress,
isSystemResponseComplete,
isOAuthConsentMessage,
extractOAuthUrl,
shouldAppendResponseContent,
} from '@/types/websocket';
import { getEndpoint } from '@/utils/app/api';
import { webSocketMessageTypes } from '@/utils/app/const';
import {
saveConversation,
saveConversations,
updateConversation,
} from '@/utils/app/conversation';
import {
fetchLastMessage,
processIntermediateMessage,
updateConversationTitle,
} from '@/utils/app/helper';
import {
shouldAppendResponse,
Expand All @@ -45,14 +38,11 @@ import {
shouldRenderAssistantMessage,
} from '@/utils/chatTransform';
import { throttle } from '@/utils/data/throttle';
import { useTranslation } from 'next-i18next';
import { useCallback, useContext, useEffect, useMemo, useRef, useState } from 'react';
import toast from 'react-hot-toast';
import { v4 as uuidv4 } from 'uuid';

import { SESSION_COOKIE_NAME } from '@/constants/constants';


import { ChatInput } from './ChatInput';
import { ChatLoader } from './ChatLoader';
import { MemoizedChatMessage } from './MemoizedChatMessage';

// Streaming utilities for handling SSE and NDJSON safely
function normalizeNewlines(s: string): string {
Expand Down Expand Up @@ -145,7 +135,6 @@ function parsePossiblyConcatenatedJson(payload: string): any[] {
// };

export const Chat = () => {
const { t } = useTranslation('chat');
const {
state: {
selectedConversation,
Expand All @@ -162,13 +151,11 @@ export const Chat = () => {
intermediateStepOverride,
enableIntermediateSteps,
},
handleUpdateConversation,
dispatch: homeDispatch,
} = useContext(HomeContext);

const [currentMessage, setCurrentMessage] = useState<Message>();
const [autoScrollEnabled, setAutoScrollEnabled] = useState<boolean>(true);
const [showSettings, setShowSettings] = useState<boolean>(false);
const [showScrollDownButton, setShowScrollDownButton] =
useState<boolean>(false);

Expand Down Expand Up @@ -321,7 +308,9 @@ export const Chat = () => {
'ws://127.0.0.1:8000/websocket';

// Determine if this is a cross-origin connection
// eslint-disable-next-line no-unused-vars
const wsUrlObj = new URL(wsUrl);
// eslint-disable-next-line no-unused-vars
const isCrossOrigin = wsUrlObj.origin !== window.location.origin;

// Always add session cookie as query parameter for reliability
Expand Down Expand Up @@ -389,7 +378,7 @@ export const Chat = () => {
}
};

ws.onerror = error => {
ws.onerror = _error => {
homeDispatch({ field: 'webSocketConnected', value: false });
webSocketConnectedRef.current = false;
homeDispatch({ field: 'loading', value: false });
Expand All @@ -412,7 +401,8 @@ export const Chat = () => {
/**
* Handles OAuth consent flow by opening popup window
*/
const handleOAuthConsent = (message: WebSocketInbound) => {
// eslint-disable-next-line no-unused-vars
const _handleOAuthConsent = (message: WebSocketInbound) => {
if (!isSystemInteractionMessage(message)) return false;

if (message.content?.input_type === 'oauth_consent') {
Expand All @@ -423,7 +413,7 @@ export const Chat = () => {
'oauth-popup',
'width=600,height=700,scrollbars=yes,resizable=yes'
);
const handleOAuthComplete = (event: MessageEvent) => {
const handleOAuthComplete = (_event: MessageEvent) => {
if (popup && !popup.closed) popup.close();
window.removeEventListener('message', handleOAuthComplete);
};
Expand Down Expand Up @@ -703,7 +693,7 @@ export const Chat = () => {
};

const handleSend = useCallback(
async (message: Message, deleteCount = 0, retry = false) => {
async (message: Message, deleteCount = 0, _retry = false) => {
message.id = uuidv4();

// Set the active user message ID for WebSocket message tracking
Expand Down Expand Up @@ -863,8 +853,7 @@ export const Chat = () => {
};

const endpoint = getEndpoint({ service: 'chat' });
let body;
body = JSON.stringify({
const body = JSON.stringify({
...chatBody,
});

Expand Down Expand Up @@ -982,6 +971,7 @@ export const Chat = () => {
chunkValue = String(chunkValue ?? '');
}

// eslint-disable-next-line no-unused-vars
counter++;

// First, handle any partial chunk from previous iteration
Expand All @@ -1006,8 +996,8 @@ export const Chat = () => {
}

// Process complete intermediate steps
let rawIntermediateSteps: any[] = [];
let stepMatches =
const rawIntermediateSteps: any[] = [];
const stepMatches =
chunkValue.match(
/<intermediatestep>([\s\S]*?)<\/intermediatestep>/g
) || [];
Expand All @@ -1017,7 +1007,7 @@ export const Chat = () => {
.replace('<intermediatestep>', '')
.replace('</intermediatestep>', '')
.trim();
let rawIntermediateMessage = tryParseJson<any>(jsonString);
const rawIntermediateMessage = tryParseJson<any>(jsonString);
// handle intermediate data
if (rawIntermediateMessage?.type === 'system_intermediate') {
rawIntermediateSteps.push(rawIntermediateMessage);
Expand Down Expand Up @@ -1343,7 +1333,6 @@ export const Chat = () => {
ref={chatContainerRef}
onScroll={handleScroll}
>
<ChatHeader webSocketModeRef={webSocketModeRef} />
{selectedConversation?.messages.map((message, index) => {
if (!shouldRenderAssistantMessage(message)) {
return null; // Hide empty assistant messages
Expand Down
22 changes: 14 additions & 8 deletions components/Chat/ChatHeader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import {
IconChevronRight,
} from '@tabler/icons-react';
import React, { useContext, useState, useRef, useEffect } from 'react';

import { env } from 'next-runtime-env';

import { getWorkflowName } from '@/utils/app/helper';

import { useTheme } from '@/contexts/ThemeContext';
import HomeContext from '@/pages/api/home/home.context';

import { DataStreamControls } from './DataStreamControls';

export const ChatHeader = ({ webSocketModeRef = {} }) => {
const [isMenuOpen, setIsMenuOpen] = useState(false);
const [isExpanded, setIsExpanded] = useState(
Expand All @@ -34,12 +35,13 @@ export const ChatHeader = ({ webSocketModeRef = {} }) => {
chatHistory,
webSocketMode,
webSocketConnected,
lightMode,
selectedConversation,
},
dispatch: homeDispatch,
} = useContext(HomeContext);

const { lightMode, setLightMode } = useTheme();

const handleLogin = () => {
console.log('Login clicked');
setIsMenuOpen(false);
Expand Down Expand Up @@ -73,7 +75,7 @@ export const ChatHeader = ({ webSocketModeRef = {} }) => {
) : (
<div className="absolute top-1/2 left-1/2 transform -translate-x-1/2 -translate-y-1/2 mx-auto flex flex-col space-y-5 md:space-y-10 px-3 pt-5 md:pt-12 sm:max-w-[600px] text-center">
<div className="text-3xl font-semibold text-gray-800 dark:text-white">
Hi, I'm {workflow}
Hi, I&apos;m {workflow}
</div>
<div className="text-lg text-gray-600 dark:text-gray-400">
How can I assist you today?
Expand All @@ -85,6 +87,10 @@ export const ChatHeader = ({ webSocketModeRef = {} }) => {
<div
className={`fixed right-0 top-0 h-12 flex items-center transition-all duration-300 ${
isExpanded ? 'mr-2' : 'mr-2'
} ${
selectedConversation?.messages?.length === 0
? 'bg-none'
: 'bg-[#76b900] dark:bg-black bo'
}`}
>
<button
Expand Down Expand Up @@ -173,15 +179,15 @@ export const ChatHeader = ({ webSocketModeRef = {} }) => {
</label>
</div>

{/* Data Stream Controls - Manages data stream display toggle and database updates button */}
<DataStreamControls />

{/* Theme Toggle Button */}
<div className="flex items-center dark:text-white text-black transition-colors duration-300">
<button
onClick={() => {
const newMode = lightMode === 'dark' ? 'light' : 'dark';
homeDispatch({
field: 'lightMode',
value: newMode,
});
setLightMode(newMode);
}}
className="rounded-full flex items-center justify-center bg-none dark:bg-gray-700 transition-colors duration-300 focus:outline-none"
>
Expand Down
Loading