diff --git a/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/serialization/Movie.kt b/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/serialization/Movie.kt index c9c19a605..3032ae927 100644 --- a/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/serialization/Movie.kt +++ b/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/serialization/Movie.kt @@ -1,15 +1,32 @@ package com.xebia.functional.xef.conversation.serialization +import com.xebia.functional.xef.conversation.Conversation import com.xebia.functional.xef.conversation.llm.openai.OpenAI -import com.xebia.functional.xef.conversation.llm.openai.prompt +import com.xebia.functional.xef.prompt.Prompt +import com.xebia.functional.xef.store.LocalVectorStore import kotlinx.serialization.Serializable @Serializable data class Movie(val title: String, val genre: String, val director: String) suspend fun main() { - OpenAI.conversation { - val movie: Movie = - prompt("Please provide a movie title, genre and director for the Inception movie") - println("The movie ${movie.title} is a ${movie.genre} film directed by ${movie.director}.") - } + // This example contemplate the case of calling OpenAI directly or + // calling through a local Xef Server instance. + // To run the example with the Xef Server, you can execute the following commands: + // - # docker compose-up server/docker/postgresql + // - # ./gradlew server + val openAI = OpenAI() + // val openAI = OpenAI(host = "http://localhost:8081/") + val model = openAI.DEFAULT_SERIALIZATION + + val scope = Conversation(LocalVectorStore(openAI.DEFAULT_EMBEDDING)) + + model + .prompt( + Prompt("Please provide a movie title, genre and director for the Inception movie"), + scope, + Movie.serializer() + ) + .let { movie -> + println("The movie ${movie.title} is a ${movie.genre} film directed by ${movie.director}.") + } } diff --git a/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/streaming/SpaceCraft.kt b/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/streaming/SpaceCraft.kt index 7dd38d451..3559219e5 100644 --- a/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/streaming/SpaceCraft.kt +++ b/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/streaming/SpaceCraft.kt @@ -1,10 +1,12 @@ package com.xebia.functional.xef.conversation.streaming +import com.xebia.functional.xef.conversation.Conversation import com.xebia.functional.xef.conversation.Description import com.xebia.functional.xef.conversation.llm.openai.OpenAI import com.xebia.functional.xef.conversation.llm.openai.promptStreaming import com.xebia.functional.xef.llm.StreamedFunction import com.xebia.functional.xef.prompt.Prompt +import com.xebia.functional.xef.store.LocalVectorStore import kotlinx.serialization.Serializable @Serializable @@ -33,17 +35,31 @@ data class InterstellarCraft( ) suspend fun main() { - OpenAI.conversation { - promptStreaming(Prompt("Make a spacecraft with a mission to Mars")) - .collect { element -> - when (element) { - is StreamedFunction.Property -> { - println("${element.path} = ${element.value}") - } - is StreamedFunction.Result -> { - println(element.value) - } + // This example contemplate the case of calling OpenAI directly or + // calling through a local Xef Server instance. + // To run the example with the Xef Server, you can execute the following commands: + // - # docker compose-up server/docker/postgresql + // - # ./gradlew server + val openAI = OpenAI() + // val openAI = OpenAI(host = "http://localhost:8081/") + val model = openAI.DEFAULT_SERIALIZATION + + val scope = Conversation(LocalVectorStore(openAI.DEFAULT_EMBEDDING)) + + model + .promptStreaming( + Prompt("Make a spacecraft with a mission to Mars"), + scope = scope, + serializer = InterstellarCraft.serializer() + ) + .collect { element -> + when (element) { + is StreamedFunction.Property -> { + println("${element.path} = ${element.value}") + } + is StreamedFunction.Result -> { + println(element.value) } } - } + } } diff --git a/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/streaming/SpaceCraftLocal.kt b/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/streaming/SpaceCraftLocal.kt deleted file mode 100644 index 261de9522..000000000 --- a/examples/kotlin/src/main/kotlin/com/xebia/functional/xef/conversation/streaming/SpaceCraftLocal.kt +++ /dev/null @@ -1,37 +0,0 @@ -package com.xebia.functional.xef.conversation.streaming - -import com.xebia.functional.xef.conversation.Conversation -import com.xebia.functional.xef.conversation.llm.openai.OpenAI -import com.xebia.functional.xef.llm.StreamedFunction -import com.xebia.functional.xef.prompt.Prompt -import com.xebia.functional.xef.store.LocalVectorStore - -/* - * This examples is the same than SpaceCraft.kt but using a local server - * - * To run this example, you need to: - * - Execute xef-server in local using the command: ./gradlew server - */ -suspend fun main() { - - val model = OpenAI(host = "http://localhost:8081/").DEFAULT_SERIALIZATION - - val scope = Conversation(LocalVectorStore(OpenAI.FromEnvironment.DEFAULT_EMBEDDING)) - - model - .promptStreaming( - Prompt("Make a spacecraft with a mission to Mars"), - scope = scope, - serializer = InterstellarCraft.serializer() - ) - .collect { element -> - when (element) { - is StreamedFunction.Property -> { - println("${element.path} = ${element.value}") - } - is StreamedFunction.Result -> { - println(element.value) - } - } - } -} diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt index 851293ac4..51e22573a 100644 --- a/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt @@ -37,6 +37,9 @@ object Server { persistenceService.addCollection() val ktorClient = HttpClient(CIO){ + engine { + requestTimeout = 0 // disabled + } install(Auth) install(Logging) { level = LogLevel.INFO diff --git a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/Routes.kt b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/Routes.kt index d633b5a36..009ded7eb 100644 --- a/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/Routes.kt +++ b/server/src/main/kotlin/com/xebia/functional/xef/server/http/routes/Routes.kt @@ -1,32 +1,20 @@ package com.xebia.functional.xef.server.http.routes import com.aallam.openai.api.BetaOpenAI -import com.aallam.openai.api.chat.ChatCompletionRequest -import com.xebia.functional.xef.conversation.Conversation -import com.xebia.functional.xef.prompt.configuration.PromptConfiguration -import com.xebia.functional.xef.conversation.llm.openai.* -import com.xebia.functional.xef.llm.StreamedFunction -import com.xebia.functional.xef.llm.models.chat.ChatCompletionRequest as XefChatCompletionRequest -import com.xebia.functional.xef.llm.models.chat.ChatCompletionResponse -import com.xebia.functional.xef.prompt.Prompt import com.xebia.functional.xef.server.services.PersistenceService import io.ktor.client.* import io.ktor.client.call.* import io.ktor.client.request.* +import io.ktor.client.statement.* import io.ktor.http.* import io.ktor.server.application.* import io.ktor.server.auth.* import io.ktor.server.request.* import io.ktor.server.response.* import io.ktor.server.routing.* -import io.ktor.util.cio.* +import io.ktor.util.* import io.ktor.util.pipeline.* -import io.ktor.utils.io.* -import io.ktor.utils.io.core.* -import kotlinx.coroutines.cancel -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import kotlinx.serialization.encodeToString +import io.ktor.utils.io.jvm.javaio.* import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.boolean @@ -54,48 +42,74 @@ fun Routing.routes( authenticate("auth-bearer") { post("/chat/completions") { val token = call.getToken() - val context = call.receive() - val data = Json.decodeFromString(context) + val body = call.receive() + val data = Json.decodeFromString(body) val isStream = data["stream"]?.jsonPrimitive?.boolean ?: false if (!isStream) { - val response = client.request("$openAiUrl/chat/completions") { - headers { - bearerAuth(token) - } - contentType(ContentType.Application.Json) - method = HttpMethod.Post - setBody(context) - } - call.respond(response.body()) + client.makeRequest(call, "$openAiUrl/chat/completions", body, token) } else { - runBlocking { - client.preparePost("$openAiUrl/chat/completions") { - headers { - bearerAuth(token) - } - contentType(ContentType.Application.Json) - method = HttpMethod.Post - setBody(context) - }.execute { httpResponse -> - val channel: ByteReadChannel = httpResponse.body() - call.respondBytesWriter(contentType = ContentType.Application.Json) { - while (!channel.isClosedForRead) { - val packet = channel.readRemaining(DEFAULT_BUFFER_SIZE.toLong()) - while (!packet.isEmpty) { - val bytes = packet.readBytes() - writeStringUtf8(bytes.decodeToString()) - } - } - } - } - } + client.makeStreaming(call, "$openAiUrl/chat/completions", body, token) } } + + post("/embeddings") { + val token = call.getToken() + val context = call.receive() + client.makeRequest(call, "$openAiUrl/embeddings", context, token) + } + } +} + +private suspend fun HttpClient.makeRequest( + call: ApplicationCall, + url: String, + body: String, + token: String +) { + val response = this.request(url) { + headers { + bearerAuth(token) + } + contentType(ContentType.Application.Json) + method = HttpMethod.Post + setBody(body) } + call.response.headers.copyFrom(response.headers) + call.respond(response.status, response.body()) } +private suspend fun HttpClient.makeStreaming( + call: ApplicationCall, + url: String, + body: String, + token: String +) { + this.preparePost(url) { + headers { + bearerAuth(token) + } + contentType(ContentType.Application.Json) + method = HttpMethod.Post + setBody(body) + }.execute { httpResponse -> + call.response.headers.copyFrom(httpResponse.headers) + call.respondOutputStream { + httpResponse + .bodyAsChannel() + .copyTo(this@respondOutputStream) + } + } +} + +private fun ResponseHeaders.copyFrom(headers: Headers) = headers + .entries() + .filter { (key, _) -> !HttpHeaders.isUnsafe(key) } // setting unsafe headers results in exception + .forEach { (key, values) -> + values.forEach { value -> this.append(key, value) } + } + private fun ApplicationCall.getProvider(): Provider = request.headers["xef-provider"]?.toProvider() ?: Provider.OPENAI