Skip to content

Commit

Permalink
feat (api): add discovery api connection
Browse files Browse the repository at this point in the history
  • Loading branch information
karahan-sahin committed Feb 9, 2024
1 parent 697308b commit c66388b
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 97 deletions.
35 changes: 28 additions & 7 deletions api/src/controllers/discovery.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
import axios from 'axios';

export const chat = async (req, res, next) => {
try{
let resp = await axios.post(`${process.env.LLM_INDEXER_HOST}/chat/stream`, req.body, {
try {
const chatRequest = {
indexIds: req.body.indexIds,
input: {
question: req.body.messages.at(-1).content,
chat_history: [...req.body.messages.slice(0, -1)]
}
}
let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/stream`,
chatRequest,
{
responseType: 'stream'
})
})
res.set(resp.headers);
resp.data.pipe(res);
} catch (error) {
Expand All @@ -13,12 +24,22 @@ export const chat = async (req, res, next) => {
}
};


export const search = async (req, res, next) => {
try{
let resp = await axios.post(`${process.env.LLM_INDEXER_HOST}/search/query`, req.body, {
try {
const searchRequest = {
indexIds: req.body.indexIds,
query: req.body.query,
page: req.body.page || 1,
limit: req.body.limit || 10,
filters: req.body.filters || [],
}

let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/search/query`,
searchRequest,
{
responseType: 'stream'
})
})
res.set(resp.headers);
resp.data.pipe(res);
} catch (error) {
Expand Down
104 changes: 49 additions & 55 deletions api/src/libs/kafka-indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ if(process.env.NODE_ENV !== 'production'){
export const createIndexItemEvent = async (id) => {
console.log("createIndexItemEvent", id)

console.log("Creating index", process.env.LLM_INDEXER_HOST)

const itemService = new ItemService()
const indexItem = await itemService.getIndexItemById(id, false);

Expand All @@ -24,13 +26,12 @@ export const createIndexItemEvent = async (id) => {
// TODO:
// Crawl can be null due to some reasons,
// need to handle separately.
console.log("Indexer Item URL", `${process.env.LLM_INDEXER_HOST}/indexer/embedding`)
console.log("Indexer Item", indexItem.item.content)
console.log("Indexer Item", indexItem.item.title)

const embeddingResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/embedding`, {
content: 'This is a test content'
console.log("Indexer Item URL", `${process.env.LLM_INDEXER_HOST}/indexer/embeddings`)

const embeddingResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/embeddings`, {
content: indexItem.content ?? 'This is a test content'
})

const embeddingService = new EmbeddingService().setSession(indexSession)
const embedding = await embeddingService.createEmbedding({
"indexId": indexItem.indexId,
Expand All @@ -56,50 +57,60 @@ export const updateIndexItemEvent = async (id) => {
const itemService = new ItemService()
const indexItem = await itemService.getIndexItemById(id, false);


try {

console.log("Logging PKP Session", indexItem)
const indexSession = await getPKPSessionForIndexer(indexItem.index);

await indexSession.did.authenticate();

if (webPage.deletedAt) {
const deleteResponse = await axios.delete(`${process.env.LLM_INDEXER_HOST}/indexer/item:${webPage.item.id}`);
console.log("Logged PKP Session")

const updateURL = `${process.env.LLM_INDEXER_HOST}/indexer/item?indexId=${indexItem.indexId}&indexItemId=${indexItem.itemId}`
console.log("IndexItem Update URL", updateURL)

if (indexItem.deletedAt !== null) {

console.log("IndexItem Deleting.")

const deleteResponse = await axios.delete(updateURL);

console.log("IndexItem Delete Response", deleteResponse)

if (deleteResponse.status === 200) {
console.log("IndexItem Deleted.")
} else {
console.log("IndexItem Deletion Failed.")
}
}

const updateResponse = await axios.put(
`${process.env.LLM_INDEXER_HOST}/indexer/index`,
{
...webPage.item
});

if (updateResponse.status === 200) {
console.log("IndexItem Update.")

} else {
console.log("IndexItem Update Failed.")
}

//
const embeddingResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/embedding`, {
content: indexItem.item.content
})
const embeddingService = new EmbeddingService().setSession(indexSession)
const embedding = await embeddingService.createEmbedding({
"indexId": indexItem.indexId,
"itemId": indexItem.itemId,
"modelName": embeddingResponse.data.model,
"category": "document",
"vector": embeddingResponse.data.vector,
"description": "Default document embeddings",
});

console.log("Embedding created", embedding.id)
const updateResponse = await axios.put(
`${process.env.LLM_INDEXER_HOST}/indexer/index`,
{
embedding: indexItem.embedding,
metadata: {

indexTitle: embedding.index.title,
indexCreatedAt: embedding.index.createdAt,
indexUpdatedAt: embedding.index.updatedAt,
indexDeletedAt: embedding.index.deletedAt,
indexOwnerDID: embedding.index.ownerDID.id,

webPageId: embedding.item.id,
webPageTitle: embedding.item.title,
webPageUrl: embedding.item.url,
webPageContent: embedding.item.content,
webPageCreatedAt: embedding.item.createdAt,
webPageUpdatedAt: embedding.item.updatedAt,
webPageDeletedAt: embedding.item.deletedAt,
},
});

if (updateResponse.status === 200) {
console.log("IndexItem Update.")
} else {
console.log("IndexItem Update Failed.")
}
}

} catch (e) {
console.log("Indexer updateIndexItemEvent error:", e.message);
Expand All @@ -108,21 +119,6 @@ export const updateIndexItemEvent = async (id) => {

export const updateWebPageEvent = async (id) => {
console.log("updateWebPageEvent", id)

const itemService = new ItemService()
const webPage = await itemService.getIndexItemById(id, false);

try {

const indexSession = await getPKPSession(webPage.index);
await indexSession.did.authenticate();

console.log("Indexer Item", webPage.deletedAt)

} catch (e) {
console.log(e)
}

}

export const createEmbeddingEvent = async (id) => {
Expand Down Expand Up @@ -164,8 +160,6 @@ export const createEmbeddingEvent = async (id) => {
} catch (e) {
console.log(e)
}


}

export const updateEmbeddingEvent = async (id) => {
Expand Down
7 changes: 6 additions & 1 deletion api/src/packages/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,15 @@ app.delete('/embeddings', authCheckMiddleware, validator.body(Joi.object({
category: Joi.string().required()
})), embeddingController.deleteEmbedding);


// Discovery
app.post('/discovery/search', validator.body(Joi.object({
query: Joi.string().required(),
indexIds: Joi.array().items(Joi.string()).required(),
page: Joi.number().default(1),
limit: Joi.number().default(24),
filters: Joi.array().items(Joi.object()).optional(),
sort: Joi.string().optional(),
desc: Joi.bool().optional(),
})), discoveryController.search)

app.post('/discovery/chat', validator.body(Joi.object({
Expand Down
5 changes: 4 additions & 1 deletion api/src/packages/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async function start() {

await redis.connect()
const consumerItems = kafka.consumer({
groupId: `index-consumer-dev-13`,
groupId: `index-consumer-dev-131`,
sessionTimeout: 300000,
heartbeatInterval: 10000,
rebalanceTimeout: 3000,
Expand All @@ -47,6 +47,9 @@ async function start() {
let docId = value.stream_id;
try {
switch (topic) {

// TODO: Add index delete update events

case definition.models.IndexItem.id:
switch (op) {
case "c":
Expand Down
18 changes: 6 additions & 12 deletions indexer/src/indexer/controller/indexer.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Body, Controller, Delete, Get, Patch, Post, Put, Query } from '@nestjs/common';
import { Body, Controller, Delete, Get, Logger, Patch, Post, Put, Query } from '@nestjs/common';
import { IndexerService } from '../service/indexer.service';
import { CrawlRequestBody, EmbeddingRequestBody, IndexDeleteQuery, IndexItemDeleteQuery, IndexRequestBody, IndexUpdateBody, IndexUpdateQuery } from '../schema/indexer.schema';
import { ChromaClient } from 'chromadb';
Expand All @@ -15,7 +15,7 @@ export class IndexerController {
return this.indexerService.crawl(body.url);
}

@Post('/embedding')
@Post('/embeddings')
async embeddings(@Body() body: EmbeddingRequestBody) {
return this.indexerService.embed(body.content);
}
Expand All @@ -24,17 +24,10 @@ export class IndexerController {

@Post('/index')
@ApiQuery({ name: 'indexId', required: true })
async createIndex(@Query('indexId') indexId: string, @Body() body: IndexRequestBody) {
async updateIndex(@Query('indexId') indexId: string, @Body() body: IndexRequestBody) {
return this.indexerService.index(indexId, body);
}

@Patch('/index')
@ApiQuery({ name: 'indexId', required: true })
async updateIndex(@Query('indexId') indexId: string, @Body() body: IndexUpdateQuery) {
// TODO: Add update whole index -> update all index items
// return this.indexerService.update(indexId, null);
}

@Delete('/index')
@ApiQuery({ name: 'indexId', required: true })
async deleteIndex(@Query('indexId') indexId: string) {
Expand All @@ -53,13 +46,14 @@ export class IndexerController {

@Patch('/item')
@ApiQuery({ name: 'indexItemId', required: true })
async updateIndexItem(@Query('indexId') indexId: string, @Body() body: IndexUpdateQuery) {
// return this.indexerService.update(indexId, null, body);
async updateIndexItem(@Query('indexId') indexId: string, @Query('indexItemId') indexItemId: string, @Body() body: IndexUpdateBody) {
return this.indexerService.update(indexId, indexItemId, body);
}

@Delete('/item')
@ApiQuery({ name: 'indexItemId', required: true })
async deleteIndexItem(@Query('indexId') indexId: string, @Query('indexItemId') indexItemId: string) {
Logger.log(`Deleting ${indexId} ${indexItemId}`, 'indexerController:deleteIndexItem');
return this.indexerService.delete(indexId, indexItemId);
}

Expand Down
2 changes: 1 addition & 1 deletion indexer/src/indexer/schema/indexer.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class IndexRequestBody {
example: ''
})
@Type(() => String)
webPageContent: string;
webPageContent: string | null;

@ApiProperty({
description: 'Date of web page indexing',
Expand Down
Loading

0 comments on commit c66388b

Please sign in to comment.