Skip to content

Commit 16d6fac

Browse files
authored
chore: add the configuration item maxLineSize (openGemini#592)
Signed-off-by: gueFDF <[email protected]>
1 parent 12dc503 commit 16d6fac

File tree

5 files changed

+44
-6
lines changed

5 files changed

+44
-6
lines changed

config/openGemini.conf

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
# https-private-key = ""
8787
# time-filter-protection = false
8888
# parallel-query-in-batch-enabled = true
89+
# max-line-size = 65536
8990

9091
[data]
9192
store-ingest-addr = "{{addr}}:8400"

lib/util/lifted/influx/httpd/config/config.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ const (
3636
// DefaultMaxRowNum is the maximum row number of a query result.
3737
DefaultMaxRowNum = 1000000
3838

39-
DefaultBlockSize = 64 * 1024
39+
DefaultBlockSize = 64 * 1024
40+
DefaultMaxLineSize = 1024 * 1024
4041
)
4142

4243
// Config represents a configuration for a HTTP service.
@@ -87,6 +88,7 @@ type Config struct {
8788
ReadBlockSize toml.Size `toml:"read-block-size"`
8889
TimeFilterProtection bool `toml:"time-filter-protection"`
8990
CPUThreshold int `toml:"cpu-threshold"`
91+
MaxLineSize int `toml:"max-line-size"`
9092
}
9193

9294
func CombineDomain(domain, addr string) string {
@@ -135,6 +137,7 @@ func NewConfig() Config {
135137
ChunkReaderParallel: cpu.GetCpuNum(),
136138
ReadBlockSize: toml.Size(DefaultBlockSize),
137139
TimeFilterProtection: false,
140+
MaxLineSize: DefaultMaxLineSize,
138141
}
139142
}
140143

lib/util/lifted/influx/httpd/handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1351,7 +1351,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta2.
13511351
tsMultiplier = 1e9 * 3600
13521352
}
13531353

1354-
ctx := influx.GetStreamContext(body)
1354+
ctx := influx.GetStreamContext(body, h.Config.MaxLineSize)
13551355
defer influx.PutStreamContext(ctx)
13561356

13571357
var numPtsParse, numPtsInsert int

lib/util/lifted/vm/protoparser/influx/streamparser.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (ctx *streamContext) Read(blockSize int) bool {
9191
if ctx.err != nil {
9292
return false
9393
}
94-
ctx.ReqBuf, ctx.tailBuf, ctx.err = ReadLinesBlockExt(ctx.br, ctx.ReqBuf, ctx.tailBuf, maxLineSize, blockSize)
94+
ctx.ReqBuf, ctx.tailBuf, ctx.err = ReadLinesBlockExt(ctx.br, ctx.ReqBuf, ctx.tailBuf, ctx.MaxLineSize, blockSize)
9595
if ctx.err != nil {
9696
if ctx.err != io.EOF {
9797
ctx.err = fmt.Errorf("cannot read influx line protocol data: %w", ctx.err)
@@ -111,6 +111,7 @@ type streamContext struct {
111111
ErrLock sync.Mutex
112112
UnmarshalErr error // unmarshal points failed, 400 error code
113113
CallbackErr error
114+
MaxLineSize int
114115
}
115116

116117
func (ctx *streamContext) Error() error {
@@ -129,7 +130,7 @@ func (ctx *streamContext) reset() {
129130
ctx.UnmarshalErr = nil
130131
}
131132

132-
func GetStreamContext(r io.Reader) *streamContext {
133+
func GetStreamContext(r io.Reader, maxLineSize int) *streamContext {
133134
select {
134135
case ctx := <-streamContextPoolCh:
135136
ctx.br.Reset(r)
@@ -138,11 +139,12 @@ func GetStreamContext(r io.Reader) *streamContext {
138139
if v := streamContextPool.Get(); v != nil {
139140
ctx := v.(*streamContext)
140141
ctx.br.Reset(r)
142+
ctx.MaxLineSize = maxLineSize
141143
return ctx
142144
}
143145
return &streamContext{
144-
br: bufio.NewReaderSize(r, 64*1024),
145-
}
146+
br: bufio.NewReaderSize(r, 64*1024),
147+
MaxLineSize: maxLineSize}
146148
}
147149
}
148150

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package influx_test
2+
3+
import (
4+
"bytes"
5+
"testing"
6+
7+
"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
8+
)
9+
10+
func TestGetStreamContextMaxLineSize(t *testing.T) {
11+
data := "12345678901234567890123456789012" // length == 32
12+
13+
ctx1 := influx.GetStreamContext(bytes.NewBuffer([]byte(data)), 20)
14+
ctx2 := influx.GetStreamContext(bytes.NewBuffer([]byte(data)), 32)
15+
ctx3 := influx.GetStreamContext(bytes.NewBuffer([]byte(data)), 33)
16+
17+
//expect false
18+
if ctx1.Read(len(data)) {
19+
t.Errorf("expected Read to return false when data length exceeds maxLineSize")
20+
}
21+
22+
//expext true
23+
if !ctx2.Read(len(data)) {
24+
t.Errorf("expected Read to return true when data length exceeds maxLineSize")
25+
}
26+
27+
//expext true
28+
if !ctx3.Read(len(data)) {
29+
t.Errorf("expected Read to return true when data length exceeds maxLineSize")
30+
}
31+
32+
}

0 commit comments

Comments
 (0)