Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cache Manager #378

Merged
merged 15 commits into from
Nov 21, 2024
3 changes: 2 additions & 1 deletion agent/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
!character.ts
.env
*.env
.env*
.env*
/data
77 changes: 59 additions & 18 deletions agent/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import { PostgresDatabaseAdapter } from "@ai16z/adapter-postgres";
import { SqliteDatabaseAdapter } from "@ai16z/adapter-sqlite";
import { DirectClientInterface } from "@ai16z/client-direct";
import { DirectClient, DirectClientInterface } from "@ai16z/client-direct";
import { DiscordClientInterface } from "@ai16z/client-discord";
import { AutoClientInterface } from "@ai16z/client-auto";
import { TelegramClientInterface } from "@ai16z/client-telegram";
import { TwitterClientInterface } from "@ai16z/client-twitter";
import {
DbCacheAdapter,
defaultCharacter,
FsCacheAdapter,
ICacheManager,
IDatabaseCacheAdapter,
stringToUuid,
AgentRuntime,
settings,
CacheManager,
Character,
IAgentRuntime,
ModelProviderName,
elizaLogger,
settings,
IDatabaseAdapter,
} from "@ai16z/eliza";
import { bootstrapPlugin } from "@ai16z/plugin-bootstrap";
import { solanaPlugin } from "@ai16z/plugin-solana";
Expand All @@ -21,7 +28,12 @@ import Database from "better-sqlite3";
import fs from "fs";
import readline from "readline";
import yargs from "yargs";
import { character } from "./character.ts";
import path from "path";
import { fileURLToPath } from "url";
import { character } from "./character";

const __filename = fileURLToPath(import.meta.url); // get the resolved path to the file
const __dirname = path.dirname(__filename); // get the name of the directory

export const wait = (minTime: number = 1000, maxTime: number = 3000) => {
const waitTime =
Expand Down Expand Up @@ -96,7 +108,7 @@ export async function loadCharacters(
} catch (e) {
console.error(`Error loading character from ${path}: ${e}`);
// don't continue to load if a specified file is not found
process.exit(1)
process.exit(1);
}
}
}
Expand Down Expand Up @@ -165,13 +177,19 @@ export function getTokenForProvider(
}
}

function initializeDatabase() {
function initializeDatabase(dataDir: string) {
if (process.env.POSTGRES_URL) {
return new PostgresDatabaseAdapter({
const db = new PostgresDatabaseAdapter({
connectionString: process.env.POSTGRES_URL,
});
return db;
} else {
return new SqliteDatabaseAdapter(new Database("./db.sqlite"));
const filePath = path.resolve(
dataDir,
process.env.SQLITE_FILE ?? "db.sqlite"
);
const db = new SqliteDatabaseAdapter(new Database(filePath));
return db;
}
}

Expand Down Expand Up @@ -215,9 +233,10 @@ export async function initializeClients(
return clients;
}

export async function createAgent(
export function createAgent(
character: Character,
db: any,
db: IDatabaseAdapter,
cache: ICacheManager,
token: string
) {
elizaLogger.success(
Expand All @@ -240,29 +259,51 @@ export async function createAgent(
actions: [],
services: [],
managers: [],
cacheManager: cache,
});
}

async function startAgent(character: Character, directClient: any) {
function intializeFsCache(baseDir: string, character: Character) {
const cacheDir = path.resolve(baseDir, character.id, "cache");

const cache = new CacheManager(new FsCacheAdapter(cacheDir));
return cache;
}

function intializeDbCache(character: Character, db: IDatabaseCacheAdapter) {
const cache = new CacheManager(new DbCacheAdapter(db, character.id));
return cache;
}

async function startAgent(character: Character, directClient: DirectClient) {
try {
character.id ??= stringToUuid(character.name);

const token = getTokenForProvider(character.modelProvider, character);
const db = initializeDatabase();
const dataDir = path.join(__dirname, "../data");

const runtime = await createAgent(character, db, token);
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}

const clients = await initializeClients(
character,
runtime as IAgentRuntime
);
const db = initializeDatabase(dataDir);

await db.init();

const cache = intializeDbCache(character, db);
const runtime = createAgent(character, db, cache, token);

const clients = await initializeClients(character, runtime);

directClient.registerAgent(await runtime);
directClient.registerAgent(runtime);

return clients;
} catch (error) {
console.error(
elizaLogger.error(
`Error starting agent for character ${character.name}:`,
error
);
console.error(error);
throw error;
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/adapter-postgres/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"types": "dist/index.d.ts",
"dependencies": {
"@ai16z/eliza": "workspace:*",
"@types/pg": "^8.11.10",
"pg": "^8.13.1"
},
"devDependencies": {
Expand Down
33 changes: 21 additions & 12 deletions packages/adapter-postgres/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CREATE EXTENSION IF NOT EXISTS vector;

BEGIN;

CREATE TABLE accounts (
CREATE TABLE IF NOT EXISTS accounts (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"name" TEXT,
Expand All @@ -25,12 +25,12 @@ CREATE TABLE accounts (
"details" JSONB DEFAULT '{}'::jsonb
);

CREATE TABLE rooms (
CREATE TABLE IF NOT EXISTS rooms (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE memories (
CREATE TABLE IF NOT EXISTS memories (
"id" UUID PRIMARY KEY,
"type" TEXT NOT NULL,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
Expand All @@ -45,7 +45,7 @@ CREATE TABLE memories (
CONSTRAINT fk_agent FOREIGN KEY ("agentId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE goals (
CREATE TABLE IF NOT EXISTS goals (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userId" UUID REFERENCES accounts("id"),
Expand All @@ -58,7 +58,7 @@ CREATE TABLE goals (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE logs (
CREATE TABLE IF NOT EXISTS logs (
"id" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userId" UUID NOT NULL REFERENCES accounts("id"),
Expand All @@ -69,7 +69,7 @@ CREATE TABLE logs (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE participants (
CREATE TABLE IF NOT EXISTS participants (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userId" UUID REFERENCES accounts("id"),
Expand All @@ -81,7 +81,7 @@ CREATE TABLE participants (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE relationships (
CREATE TABLE IF NOT EXISTS relationships (
"id" UUID PRIMARY KEY,
"createdAt" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"userA" UUID NOT NULL REFERENCES accounts("id"),
Expand All @@ -93,11 +93,20 @@ CREATE TABLE relationships (
CONSTRAINT fk_user FOREIGN KEY ("userId") REFERENCES accounts("id") ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS cache (
"key" TEXT NOT NULL,
"agentId" TEXT NOT NULL,
"value" JSONB DEFAULT '{}'::jsonb,
"createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"expiresAt" TIMESTAMP,
PRIMARY KEY ("key", "agentId")
);

-- Indexes
CREATE INDEX idx_memories_embedding ON memories USING hnsw ("embedding" vector_cosine_ops);
CREATE INDEX idx_memories_type_room ON memories("type", "roomId");
CREATE INDEX idx_participants_user ON participants("userId");
CREATE INDEX idx_participants_room ON participants("roomId");
CREATE INDEX idx_relationships_users ON relationships("userA", "userB");
CREATE INDEX IF NOT EXISTS idx_memories_embedding ON memories USING hnsw ("embedding" vector_cosine_ops);
CREATE INDEX IF NOT EXISTS idx_memories_type_room ON memories("type", "roomId");
CREATE INDEX IF NOT EXISTS idx_participants_user ON participants("userId");
CREATE INDEX IF NOT EXISTS idx_participants_room ON participants("roomId");
CREATE INDEX IF NOT EXISTS idx_relationships_users ON relationships("userA", "userB");

COMMIT;
96 changes: 89 additions & 7 deletions packages/adapter-postgres/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { v4 } from "uuid";
import pg from "pg";
import pg, { type Pool } from "pg";
import {
Account,
Actor,
Expand All @@ -8,18 +8,27 @@ import {
type Memory,
type Relationship,
type UUID,
type IDatabaseCacheAdapter,
Participant,
DatabaseAdapter,
} from "@ai16z/eliza";
import { DatabaseAdapter } from "@ai16z/eliza";
const { Pool } = pg;
import fs from "fs";
import { fileURLToPath } from "url";
import path from "path";

export class PostgresDatabaseAdapter extends DatabaseAdapter {
private pool: typeof Pool;
const __filename = fileURLToPath(import.meta.url); // get the resolved path to the file
const __dirname = path.dirname(__filename); // get the name of the directory

export class PostgresDatabaseAdapter
extends DatabaseAdapter<Pool>
implements IDatabaseCacheAdapter
{
private pool: Pool;

constructor(connectionConfig: any) {
super();

this.pool = new Pool({
this.pool = new pg.Pool({
...connectionConfig,
max: 20,
idleTimeoutMillis: 30000,
Expand All @@ -29,8 +38,22 @@ export class PostgresDatabaseAdapter extends DatabaseAdapter {
this.pool.on("error", (err) => {
console.error("Unexpected error on idle client", err);
});
}

async init() {
await this.testConnection();

this.testConnection();
try {
const client = await this.pool.connect();
const schema = fs.readFileSync(
path.resolve(__dirname, "../schema.sql"),
"utf8"
);
await client.query(schema);
} catch (error) {
console.error(error);
throw error;
}
}

async testConnection(): Promise<boolean> {
Expand Down Expand Up @@ -849,6 +872,65 @@ export class PostgresDatabaseAdapter extends DatabaseAdapter {
throw new Error("Failed to fetch actor details");
}
}

async getCache(params: {
key: string;
agentId: UUID;
}): Promise<string | undefined> {
const client = await this.pool.connect();
try {
const sql = `SELECT "value"::TEXT FROM cache WHERE "key" = $1 AND "agentId" = $2`;
const { rows } = await this.pool.query<{ value: string }>(sql, [
params.key,
params.agentId,
]);

return rows[0]?.value ?? undefined;
} catch (error) {
console.log("Error fetching cache", error);
} finally {
client.release();
}
}

async setCache(params: {
key: string;
agentId: UUID;
value: string;
}): Promise<boolean> {
const client = await this.pool.connect();
try {
await client.query(
`INSERT INTO cache ("key", "agentId", "value", "createdAt") VALUES ($1, $2, $3, CURRENT_TIMESTAMP)
ON CONFLICT ("key", "agentId")
DO UPDATE SET "value" = EXCLUDED.value, "createdAt" = CURRENT_TIMESTAMP`,
[params.key, params.agentId, params.value]
);
return true;
} catch (error) {
console.log("Error adding cache", error);
} finally {
client.release();
}
}

async deleteCache(params: {
key: string;
agentId: UUID;
}): Promise<boolean> {
const client = await this.pool.connect();
try {
await client.query(
`DELETE FROM cache WHERE "key" = $1 AND "agentId" = $2`,
[params.key, params.agentId]
);
return true;
} catch (error) {
console.log("Error adding cache", error);
} finally {
client.release();
}
}
}

export default PostgresDatabaseAdapter;
Loading