-
Notifications
You must be signed in to change notification settings - Fork 3
/
completion.go
130 lines (110 loc) · 2.57 KB
/
completion.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"bytes"
"context"
"errors"
"io"
"time"
ai "github.com/sashabaranov/go-openai"
)
func ChatCompletionTask(ctx *ChatContext) <-chan *string {
ch := make(chan *string)
go chatCompletionStream(ctx, ch)
return ch
}
func chatCompletionStream(cc *ChatContext, channel chan<- *string) {
defer close(channel)
cc.Stats()
ctx, cancel := context.WithTimeout(cc, cc.Session.Config.ClientTimeout)
defer cancel()
stream, err := cc.AI.CreateChatCompletionStream(ctx, ai.ChatCompletionRequest{
MaxTokens: cc.Session.Config.MaxTokens,
Model: cc.Personality.Model,
Messages: cc.Session.GetHistory(),
Stream: true,
})
if err != nil {
senderror(err, channel)
return
}
defer stream.Close()
chunker := &Chunker{
Size: cc.Session.Config.Chunkmax,
Last: time.Now(),
Timeout: cc.Session.Config.Chunkdelay,
Buffer: &bytes.Buffer{},
}
for {
response, err := stream.Recv()
if err != nil {
send(chunker.Buffer.String(), channel)
if !errors.Is(err, io.EOF) {
senderror(err, channel)
}
return
}
if len(response.Choices) != 0 {
chunker.Buffer.WriteString(response.Choices[0].Delta.Content)
}
for {
if ready, chunk := chunker.Chunk(); ready {
send(string(*chunk), channel)
} else {
break
}
}
}
}
func senderror(err error, channel chan<- *string) {
e := err.Error()
channel <- &e
}
func send(chunk string, channel chan<- *string) {
channel <- &chunk
}
type Chunker struct {
Size int
Last time.Time
Buffer *bytes.Buffer
Timeout time.Duration
}
func (c *Chunker) Chunk() (bool, *[]byte) {
end := c.Size
if c.Buffer.Len() < end {
end = c.Buffer.Len()
}
// chunk on a newline in first chunksize
index := bytes.IndexByte(c.Buffer.Bytes()[:end], '\n')
if index != -1 {
chunk := c.Buffer.Next(index + 1)
c.Last = time.Now()
return true, &chunk
}
// chunk if full buffer satisfies chunk size
if c.Buffer.Len() >= c.Size {
chunk := c.Buffer.Next(c.Size)
c.Last = time.Now()
return true, &chunk
}
// chunk on boundary if n seconds have passed since the last chunk
if time.Since(c.Last) >= c.Timeout {
content := c.Buffer.Bytes()
index := c.Boundary(&content)
if index != -1 {
chunk := c.Buffer.Next(index + 1)
c.Last = time.Now()
return true, &chunk
}
}
// no chunk
return false, nil
}
// other languages are a thing, but for now...
func (c *Chunker) Boundary(s *[]byte) int {
for i := 0; i < len(*s)-1; i++ {
if ((*s)[i] == '.' || (*s)[i] == ':' || (*s)[i] == '!' || (*s)[i] == '?') && ((*s)[i+1] == ' ' || (*s)[i+1] == '\t') {
return i + 1
}
}
return -1
}