Skip to content

Commit

Permalink
Merge pull request #19 from indexnetwork/kafka-chromadb
Browse files Browse the repository at this point in the history
feat (search): query endpoint added
  • Loading branch information
serefyarar authored Feb 8, 2024
2 parents 114d9c9 + 6ef90aa commit e54cfa0
Show file tree
Hide file tree
Showing 17 changed files with 467 additions and 245 deletions.
42 changes: 39 additions & 3 deletions api/src/libs/kafka-indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const createIndexItemEvent = async (id) => {
const indexSession = await getPKPSessionForIndexer(indexItem.index);
await indexSession.did.authenticate();

const embeddingResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/embeddings`, {
const embeddingResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/embedding`, {
text: indexItem.item.content
})
const embeddingService = new EmbeddingService().setSession(indexSession)
Expand All @@ -45,6 +45,43 @@ export const updateIndexItemEvent = async (id) => {

export const updateWebPageEvent = async (id) => {
console.log("updateWebPageEvent", id)

const itemService = new ItemService()
const webPage = await itemService.getIndexItemById(id);

try {

const indexSession = await getPKPSession(webPage.index);
await indexSession.did.authenticate();

if (webPage.item.deletedAt) {
const deleteResponse = await axios.delete(`${process.env.LLM_INDEXER_HOST}/indexer/item:${webPage.item.id}`);

if (deleteResponse.status === 200) {
console.log("IndexItem Deleted.")
} else {
console.log("IndexItem Deletion Failed.")
}
}

console.log(`${JSON.stringify(webPage.item)} `)

const updateResponse = await axios.put(
`${process.env.LLM_INDEXER_HOST}/indexer/index`,
{
...webPage.item
});

if (updateResponse.status === 200) {
console.log("IndexItem Update.")
} else {
console.log("IndexItem Update Failed.")
}

} catch (e) {
console.log(e)
}

}

export const createEmbeddingEvent = async (id) => {
Expand Down Expand Up @@ -81,7 +118,7 @@ export const createEmbeddingEvent = async (id) => {
}

try {
const indexResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/index`, payload)
const indexResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/index`, payload)
} catch (e) {
console.log(e)
}
Expand All @@ -93,4 +130,3 @@ export const createEmbeddingEvent = async (id) => {
export const updateEmbeddingEvent = async (id) => {
console.log("updateEmbeddingEvent", id)
}

29 changes: 27 additions & 2 deletions api/src/services/webpage.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class WebPageService {
title
favicon
url
content
content
createdAt
updatedAt
deletedAt
Expand Down Expand Up @@ -70,6 +70,31 @@ export class WebPageService {
}
}

}
async getWebPageById(webPageId) {

try {
const {data, errors} = await this.client.executeQuery(`
{
node(id: "${webPageId}") {
${webPageFragment}
}
}`);

// Handle GraphQL errors
if (errors) {
throw new Error(`Error getting index item: ${JSON.stringify(errors)}`);
}
// Validate the data response
if (!data || !data.node) {
throw new Error('Invalid response data');
}

return data.node;

} catch (error) {
// Log the error and rethrow it for external handling
console.error('Exception occurred in getWebPageById:', error);
throw error;
}
}
}
2 changes: 1 addition & 1 deletion indexer/.env_example
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ MISTRAL_API_KEY=
MISTRAL_MODEL_EMBEDDING=mistral-embed
MISTRAL_MODEL_CHAT=mistral-small

UNSTRUCTURED_URL=http://localhost:8000/general/v0/general
UNSTRUCTURED_API_URL=http://localhost:8000/general/v0/general
UNSTRUCTURED_API_KEY=

export LANGCHAIN_TRACING_V2=true
Expand Down
25 changes: 7 additions & 18 deletions indexer/src/app/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Module } from '@nestjs/common';
import { Module, Search } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { IndexerController } from '../indexer/controller/indexer.controller';
Expand All @@ -10,6 +10,8 @@ import { IndexerService } from 'src/indexer/service/indexer.service';
import { HttpModule } from '@nestjs/axios';
import { AgentModule } from './modules/agent.module';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { SearchController } from 'src/search/controller/search.controller';
import { SearchService } from 'src/search/service/search.service';

@Module({
imports: [
Expand All @@ -20,31 +22,18 @@ import { ClientsModule, Transport } from '@nestjs/microservices';
ChromaModule.register(process.env.OPENAI_API_KEY),
AgentModule.register(),

// TODO: Add event streamer to agents
// ClientsModule.register([{
// name: 'KAFKA_SERVICE',
// transport: Transport.KAFKA,
// options: {
// client: {
// clientId: 'indexer',
// brokers: [process.env.INDEXER_KAFKA_URI],
// },
// consumer: {
// groupId: 'indexer-agent',
// },
// },
// }]),

],
controllers: [
AppController,
IndexerController,
ChatController
ChatController,
SearchController
],
providers: [
AppService,
ChatService,
IndexerService
IndexerService,
SearchService
],
})
export class AppModule {}
116 changes: 68 additions & 48 deletions indexer/src/app/modules/agent.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ export class Agent {
// TODO: Can we also add context type by intent find?
// TODO: Research on low-computation models for subtasks


// IMPLEMENTATION TODOS:
// TODO: Add initial filtering according to the index (rag)
// TODO: Add initial filtering according to the index (retriever)
// TODO: Add standalone question pipeline

constructor (
) {
const apiKey = process.env.OPENAI_API_KEY;
Expand All @@ -33,41 +39,46 @@ export class Agent {
this.apiKey = apiKey;
}

public async createAgentChain(chain_type: string, index_id: string, model_type: string) {
public async createAgentChain(chain_type: string = 'rag-v0', indexIds: string[], model_type: string = 'OpenAI'): Promise<any> {

switch (chain_type) {

case 'rag-v0':
return this.createRAGChain(index_id, model_type);
return this.createRAGChain(indexIds, model_type);

default:
throw new Error('Chain type not supported');
}
}

public async createRetrieverChain(chain_type: string, index_id: string, model_type: string, k: number) {
public async createRetrieverChain(chain_type: string = 'query-v0', index_ids: string[], model_type: string = 'OpenAI', page: number, skip: number, limit: number): Promise<any> {

switch (chain_type) {

case 'query-v0':
return this.createQueryRetriever(index_id, model_type, k);
return this.createQueryRetriever(index_ids, model_type, page, skip, limit);

default:
throw new Error('Chain type not supported');
}
}


private async createRAGChain (chroma_index: string, model_type: string): Promise<any>{
private async createRAGChain (chroma_indices: string[], model_type: string): Promise<any>{

let model: any;
if (model_type == 'OpenAI') { model = new ChatOpenAI({ modelName: process.env.MODEL_CHAT }) }
else if (model_type == 'MistralAI') { model = new ChatMistralAI({ modelName: process.env.MISTRAL_MODEL_CHAT, apiKey: process.env.MISTRAL_API_KEY }) }

const vectorStore = await Chroma.fromExistingCollection(
new OpenAIEmbeddings({ openAIApiKey: process.env.OPENAI_API_KEY, modelName: process.env.MODEL_EMBEDDING}),
{ url: process.env.CHROMA_URL, collectionName: chroma_index }
);
new OpenAIEmbeddings({ modelName: process.env.MODEL_EMBEDDING}),
{
url: process.env.CHROMA_URL,
collectionName: process.env.CHROMA_COLLECTION_NAME,
filter: {
indexId: chroma_indices
}
});

const retriever = vectorStore.asRetriever();

Expand Down Expand Up @@ -104,8 +115,8 @@ export class Agent {
if (Array.isArray(chatHistory)) {
const updatedChat = chatHistory.map(
(dialogTurn: any) => {
if (dialogTurn.hasOwnProperty('human')) { return `Human: ${dialogTurn['human']}` }
if (dialogTurn.hasOwnProperty('ai')) { return `AI: ${dialogTurn['ai']}` }
if (dialogTurn['speaker'] == 'human') { return `Human: ${dialogTurn['text']}` }
if (dialogTurn['speaker'] == 'ai') { return `AI: ${dialogTurn['text']}` }
}
).join("\n");
Logger.log(updatedChat, "ChatService:formatChatHistory");
Expand Down Expand Up @@ -181,7 +192,7 @@ export class Agent {

}

private async createQueryRetriever (chroma_index: string, model_type: string, k: number) {
private async createQueryRetriever (chroma_indices: string[], model_type: string, page: number, skip: number, limit: number) {

// Not implemented yet
// https://js.langchain.com/docs/modules/data_connection/retrievers/self_query/chroma-self-query
Expand Down Expand Up @@ -244,56 +255,65 @@ export class Agent {
},

];

const documentContents = 'Document metadata'

let model: any;
if (model_type == 'OpenAI' ) { model = new ChatOpenAI({ modelName: process.env.MODEL_CHAT }) }
else if (model_type == 'MistralAI') { model = new ChatMistralAI({ modelName: process.env.MISTRAL_MODEL_CHAT, apiKey: process.env.MISTRAL_API_KEY }) }

const vectorStore = await Chroma.fromExistingCollection(
new OpenAIEmbeddings({openAIApiKey: process.env.OPENAI_API_KEY, modelName: process.env.MODEL_EMBEDDING}),
{ url: process.env.CHROMA_URL, collectionName: chroma_index }
);

// Maybe we can generate doc contents as well?
const documentContents = 'Document metadata'
const embeddings = new OpenAIEmbeddings({
verbose: true,
openAIApiKey: process.env.OPENAI_API_KEY,
modelName: process.env.MODEL_EMBEDDING
})

Logger.log(`Creating vector store with ${process.env.MODEL_EMBEDDING} embeddings`, "Agent:createQueryRetriever");

Logger.log("Creating retriever", "Agent:createQueryRetriever");
Logger.log(JSON.stringify(model), "Agent:createQueryRetriever:model");

const selfQueryRetriever = SelfQueryRetriever.fromLLM({
llm: model,
vectorStore,
documentContents,
attributeInfo,
structuredQueryTranslator: new ChromaTranslator(),
searchParams: {
k: k,
mergeFiltersOperator: 'and',
const vectorStore = await Chroma.fromExistingCollection(
embeddings,
{
url: process.env.CHROMA_URL,
collectionName: process.env.CHROMA_COLLECTION_NAME,
filter: {
indexId: chroma_indices
}
});
});

return selfQueryRetriever;
const final_chain = RunnableSequence.from([
{
documents: async (input) => {
const queryEmbedding = await embeddings.embedQuery(input.query)
const docs = await vectorStore.collection.query({
queryEmbeddings: [queryEmbedding],
nResults: (page * limit),
})
return docs
}
},
{
documents: (input) => {
const ids = input.documents?.ids[0]
const similarities = input.documents?.distances[0]

return ids.map(function(id, i) {
return {
id: id,
similarity: similarities[i],
};
});
}
},
RunnableLambda.from((input) => {
return input.documents.slice((page-1)*limit, page*limit)
})
]);

return final_chain;
}



//* Helper functions

private async serializeChatHistory(chatHistory: string | string[]) {
if (Array.isArray(chatHistory)) {
return chatHistory.join("\n");
}

return chatHistory;
};

private async combineDocuments(documents: any[], document_prompt: PromptTemplate, document_separator: string) {

const combinedDocument = formatDocumentsAsString(documents);

return combinedDocument;
}
}


Expand Down
4 changes: 2 additions & 2 deletions indexer/src/app/modules/chroma.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ export class ChromaModule {
if (!process.env.OPENAI_API_KEY) throw new Error('OpenAI API key is required');

Logger.debug(process.env.CHROMA_URL, 'ChromaModule');
Logger.debug(process.env.COLLECTION_NAME, 'ChromaModule');
Logger.debug(process.env.CHROMA_COLLECTION_NAME, 'ChromaModule');

const vectorStore = await Chroma.fromExistingCollection(
new OpenAIEmbeddings({
openAIApiKey: apiKey,
modelName: process.env.MODEL_EMBEDDING,
}),
{
collectionName: process.env.COLLECTION_NAME,
collectionName: process.env.CHROMA_COLLECTION_NAME,
url: process.env.CHROMA_URL,
}
);
Expand Down
Loading

0 comments on commit e54cfa0

Please sign in to comment.