Skip to content

Commit

Permalink
Merge branch 'contextual-pubsub' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
serefyarar committed Jun 1, 2024
2 parents 2560501 + bc49611 commit f6ff1c1
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 29 deletions.
1 change: 1 addition & 0 deletions api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"ethers": "5.7.1",
"express": "^4.18.2",
"express-joi-validation": "^5.0.1",
"express-ws": "^5.0.2",
"fetch": "^1.1.0",
"global": "^4.4.0",
"http-errors": "~1.6.3",
Expand Down
33 changes: 19 additions & 14 deletions api/src/clients/redis.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
import dotenv from 'dotenv'
if(process.env.NODE_ENV !== 'production'){
dotenv.config()
import dotenv from "dotenv";
if (process.env.NODE_ENV !== "production") {
dotenv.config();
}


import { createClient } from 'redis';
import { createClient } from "redis";

class RedisClient {

constructor() {
throw new Error('Use Singleton.getInstance()');

throw new Error("Use Singleton.getInstance()");
}

static getInstance() {
if (!RedisClient.client) {
RedisClient.client = createClient({
url: process.env.REDIS_CONNECTION_STRING
});
}
return RedisClient.client;
if (!RedisClient.client) {
RedisClient.client = createClient({
url: process.env.REDIS_CONNECTION_STRING,
});
}
return RedisClient.client;
}

static getPubSubInstance() {
if (!RedisClient.pubsub) {
RedisClient.pubsub = createClient({
url: process.env.REDIS_CONNECTION_STRING,
});
}
return RedisClient.pubsub;
}
}

export default RedisClient;
76 changes: 74 additions & 2 deletions api/src/controllers/discovery.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import axios from "axios";

import { ethers } from "ethers";
import { CeramicClient } from "@ceramicnetwork/http-client";

import RedisClient from "../clients/redis.js";
import { flattenSources } from "../utils/helpers.js";
import { DIDService } from "../services/did.js";

const redis = RedisClient.getInstance();
const pubSubClient = RedisClient.getPubSubInstance();

const ceramic = new CeramicClient(process.env.CERAMIC_HOST);

export const chat = async (req, res, next) => {
const definition = req.app.get("runtimeDefinition");
Expand Down Expand Up @@ -34,14 +37,83 @@ export const chat = async (req, res, next) => {
},
);
res.set(resp.headers);
resp.data.pipe(res);

let cmdMode = false;
let inferredCmd = "";
resp.data.on("data", (chunk) => {
const plainText = chunk.toString();
if (plainText.includes("<<")) {
cmdMode = true;
} else if (plainText.includes(">>")) {
cmdMode = false;
} else if (cmdMode) {
inferredCmd += plainText;
} else {
res.write(chunk);
}
});

resp.data.on("end", async () => {
if (inferredCmd) {
await redis.hSet(
`subscriptions`,
id,
JSON.stringify({
indexIds: reqIndexIds,
messages,
}),
);
await redis.publish(`newSubscription`, id);
}
console.log("Stream ended", inferredCmd);
res.end();
});
} catch (error) {
// Handle the exception
console.error("An error occurred:", error);
res.status(500).json({ error: "Internal Server Error" });
}
};

export const updates = async (ws, req) => {
//const definition = req.app.get("runtimeDefinition");
//const { id, messages, sources, ...rest } = req.body;

const { chatId } = req.params;
await pubSubClient.subscribe(`newUpdate:${chatId}`, async (itemId) => {
console.log(chatId, itemId, `newUpdate`);
console.log("New update triggered, fetching item data", itemId);
const subscriptionResp = await redis.hGet(`subscriptions`, chatId);
if (!subscriptionResp) {
return;
}
const { indexIds, messages } = JSON.parse(subscriptionResp);
const itemStream = await ceramic.loadStream(itemId);
const chatRequest = {
indexIds,
input: {
question: `
Determine if the following information is relevant to the previous conversation.
If it is relevant, output a conversation simulating that you received real-time news for the user.
Use conversational output format suitable to data model, use bold texts and links when available.
Do not mention relevancy check, just share it as an update.
If it is not relevant, simply say "NOT_RELEVANT.
Information: ${JSON.stringify(itemStream.content)}
`,
chat_history: messages,
},
};
let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/stream`,
chatRequest,
{
responseType: "text",
},
);
console.log("Update evaluation response", resp.data);
ws.send(resp.data);
});
};
export const search = async (req, res, next) => {
try {
const searchRequest = {
Expand Down
25 changes: 25 additions & 0 deletions api/src/controllers/farcaster.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { ComposeDBService } from "../services/composedb.js";

export const createWebPage = async (req, res, next) => {
const definition = req.app.get("runtimeDefinition");
const modelFragments = req.app.get("modelFragments");
const castFragment = modelFragments.filter(
(fragment) => fragment.name === "FarcasterCast",
)[0];

//todo get fragment
try {
const composeDBService = new ComposeDBService(
definition,
castFragment,
).setSession(req.session);

const cast = await composeDBService.createNode({
...req.body,
});

res.status(201).json(cast);
} catch (error) {
res.status(500).json({ error: error.message });
}
};
14 changes: 14 additions & 0 deletions api/src/libs/indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Logger from "../utils/logger.js";
const logger = Logger.getInstance();

const redis = RedisClient.getInstance();

const ceramic = new CeramicClient(process.env.CERAMIC_HOST);

if (process.env.NODE_ENV !== "production") {
Expand Down Expand Up @@ -232,6 +233,17 @@ class Indexer {
const embedding = await embeddingService.getEmbeddingById(id);
const stream = await ceramic.loadStream(embedding.item.id);

const allSubscriptions = await redis.hGetAll(`subscriptions`);
for (const [chatId, subscriptionPayload] of Object.entries(
allSubscriptions,
)) {
// Parse the JSON string to an object
let subscription = JSON.parse(subscriptionPayload);
console.log(embedding.item.id, embedding.index.id, subscription.indexIds);
if (subscription.indexIds.indexOf(embedding.index.id) >= 0) {
await redis.publish(`newUpdate:${chatId}`, embedding.item.id);
}
}
const doc = {
...stream.content,
id: stream.id.toString(),
Expand Down Expand Up @@ -265,6 +277,8 @@ class Indexer {
`${process.env.LLM_INDEXER_HOST}/indexer/index?indexId=${embedding.index.id}`,
payload,
);

// todo send fluence as well.
logger.info(
`Step [3]: Index ${embedding.indexId} with Item ${embedding.itemId} indexed with it's content and embeddings`,
);
Expand Down
10 changes: 9 additions & 1 deletion api/src/packages/api.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import dotenv from "dotenv";
import Moralis from "moralis";
import express from "express";
import expressWs from "express-ws";

import Joi from "joi";
import * as ejv from "express-joi-validation";

import RedisClient from "../clients/redis.js";

import { createProxyMiddleware } from "http-proxy-middleware";

const app = express();
const { app } = expressWs(express());

if (process.env.NODE_ENV !== "production") {
dotenv.config();
Expand All @@ -17,6 +19,7 @@ if (process.env.NODE_ENV !== "production") {
const port = process.env.PORT || 3001;

const redis = RedisClient.getInstance();
const pubSubClient = RedisClient.getPubSubInstance();

import * as indexController from "../controllers/index.js";
import * as itemController from "../controllers/item.js";
Expand Down Expand Up @@ -517,9 +520,14 @@ app.delete(
// Validators
app.use(errorMiddleware);

//app.get("/discovery/stream", );

app.ws("/discovery/:chatId/updates", discoveryController.updates);

const start = async () => {
console.log("Starting API ...", port);
await redis.connect();
await pubSubClient.connect()

await setIndexedModelParams(app);

Expand Down
16 changes: 6 additions & 10 deletions api/src/packages/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,27 @@ import { EventSource } from "cross-eventsource";
import { JsonAsString, AggregationDocument } from "@ceramicnetwork/codecs";
import { decode } from "codeco";
import { fetchModelInfo } from "../utils/helpers.js";
import { createClient } from "redis";

const ceramicFirehose = new EventSource(
`${process.env.CERAMIC_HOST}/api/v0/feed/aggregation/documents`,
);
const Codec = JsonAsString.pipe(AggregationDocument);

const redis = RedisClient.getInstance();

const subClient = createClient({
url: process.env.REDIS_CONNECTION_STRING,
});
const redisClient = RedisClient.getInstance();
const pubSubClient = RedisClient.getPubSubInstance();

async function start() {
await subClient.connect();
await redis.connect();
await pubSubClient.connect();
await redisClient.connect();
let { runtimeDefinition, modelFragments } = await fetchModelInfo();
let indexer = new Indexer(runtimeDefinition, modelFragments);
subClient.subscribe("newModel", async (id) => {
pubSubClient.subscribe(`newModel`, async (id) => {
console.log("New model detected, fetching model info", id);
({ runtimeDefinition, modelFragments } = await fetchModelInfo());
indexer = new Indexer(runtimeDefinition, modelFragments);
});

subClient.subscribe("reIndex", async (id) => {
pubSubClient.subscribe(`reIndex`, async (id) => {
console.log("Reindex an item through external redis subscription.", id);
await indexer.createIndexItemEvent(id);
});
Expand Down
2 changes: 1 addition & 1 deletion api/src/services/item.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class ItemService {
);
return e;
});

console.log(data.indexItemIndex.edges);
return {
//Todo fix itemId to id
endCursor: data.indexItemIndex.pageInfo.endCursor,
Expand Down
9 changes: 8 additions & 1 deletion api/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5863,6 +5863,13 @@ express-joi-validation@^5.0.1:
resolved "https://registry.yarnpkg.com/express-joi-validation/-/express-joi-validation-5.0.1.tgz#ded03f35ba5d95b57da6ebe22b2b6d2e4ab074c4"
integrity sha512-BztcU64addcAdxys2j42pZVSnJjEyFaLxNko7YSYDUuEBtKq2XnhzYZuy9ex9Q+Fdhef+NwLXhX1djwZmShCLg==

express-ws@^5.0.2:
version "5.0.2"
resolved "https://registry.yarnpkg.com/express-ws/-/express-ws-5.0.2.tgz#5b02d41b937d05199c6c266d7cc931c823bda8eb"
integrity sha512-0uvmuk61O9HXgLhGl3QhNSEtRsQevtmbL94/eILaliEADZBHZOQUAiHFrGPrgsjikohyrmSG5g+sCfASTt0lkQ==
dependencies:
ws "^7.4.6"

express@^4.18.2:
version "4.19.2"
resolved "https://registry.yarnpkg.com/express/-/express-4.19.2.tgz#e25437827a3aa7f2a827bc8171bbbb664a356465"
Expand Down Expand Up @@ -10271,7 +10278,7 @@ [email protected]:
resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0"
integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==

ws@^7.5.1:
ws@^7.4.6, ws@^7.5.1:
version "7.5.9"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.9.tgz#54fa7db29f4c7cec68b1ddd3a89de099942bb591"
integrity sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==
Expand Down
10 changes: 10 additions & 0 deletions web-app/src/components/site/indexes/AskIndexes/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ const AskIndexes: FC<AskIndexesProps> = ({ chatID, sources }) => {
fetchDefaultQuestions();
}, [fetchDefaultQuestions]);

const ws = new WebSocket(`ws://localhost:8000/discovery/${chatID}/updates`);

ws.onmessage = async (event) => {
console.log(event)
await append({
id: chatID,
content: event.data,
role: "assistant",
});
};
const handleEditClick = (message: Message, indexOfMessage: number) => {
setEditingMessage(message);
setEditingIndex(indexOfMessage);
Expand Down

0 comments on commit f6ff1c1

Please sign in to comment.