Skip to content

Commit b2507ab

Browse files
authored
feat(prom): groupByAll/without not use schemaDims of prom (openGemini#604)
Signed-off-by: lihanxue <[email protected]>
1 parent 1e0adf1 commit b2507ab

24 files changed

+887
-305
lines changed

engine/executor/agg_transform.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ func (trans *StreamAggregateTransform) isSameGroup(c Chunk) bool {
591591
}
592592

593593
// Case1: tag and time are grouped by.
594-
if trans.opt.Dimensions != nil && !trans.opt.Interval.IsZero() {
594+
if (trans.opt.Dimensions != nil || trans.opt.IsPromGroupAllOrWithout()) && !trans.opt.Interval.IsZero() {
595595
if bytes.Equal(nextChunk.Tags()[0].Subset(trans.opt.Dimensions),
596596
c.Tags()[len(c.Tags())-1].Subset(trans.opt.Dimensions)) {
597597
startTime, endTime := trans.opt.Window(c.TimeByIndex(c.NumberOfRows() - 1))
@@ -601,7 +601,7 @@ func (trans *StreamAggregateTransform) isSameGroup(c Chunk) bool {
601601
}
602602

603603
// Case2: only tag is grouped by.
604-
if trans.opt.Dimensions != nil && trans.opt.Interval.IsZero() {
604+
if (trans.opt.Dimensions != nil || trans.opt.IsPromGroupAllOrWithout()) && trans.opt.Interval.IsZero() {
605605
return bytes.Equal(nextChunk.Tags()[0].Subset(trans.opt.Dimensions),
606606
c.Tags()[len(c.Tags())-1].Subset(trans.opt.Dimensions))
607607
}
@@ -623,7 +623,7 @@ func (trans *StreamAggregateTransform) isSameTag(c Chunk) bool {
623623
}
624624

625625
// Case1: tag is grouped by.
626-
if trans.opt.Dimensions != nil {
626+
if trans.opt.Dimensions != nil || trans.opt.IsPromGroupAllOrWithout() {
627627
return bytes.Equal(nextChunk.Tags()[0].Subset(trans.opt.Dimensions),
628628
c.Tags()[len(c.Tags())-1].Subset(trans.opt.Dimensions))
629629
}

engine/executor/chunk_tags.go

+53-2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ func NewChunkTags(pts influx.PointTags, dimensions []string) *ChunkTags {
5757
return c
5858
}
5959

60+
func NewChunkTagsWithoutDims(pts influx.PointTags, withoutDims []string) *ChunkTags {
61+
c := &ChunkTags{}
62+
c.encodeTagsWithoutDims(pts, withoutDims)
63+
return c
64+
}
65+
6066
func NewChunkTagsByTagKVs(k []string, v []string) *ChunkTags {
6167
c := &ChunkTags{}
6268
c.encodeTagsByTagKVs(k, v)
@@ -131,6 +137,19 @@ func (ct *ChunkTags) Reset() {
131137
}
132138

133139
func (ct *ChunkTags) KeepKeys(keys []string) *ChunkTags {
140+
var m influx.PointTags
141+
var ss []string
142+
for _, kv := range ct.decodeTags() {
143+
if ContainDim(keys, kv[0]) {
144+
ss = append(ss, kv[0])
145+
m = append(m, influx.Tag{Key: kv[0], Value: kv[1], IsArray: false})
146+
}
147+
}
148+
sort.Sort(&m)
149+
return NewChunkTags(m, ss)
150+
}
151+
152+
func (ct *ChunkTags) RemoveKeys(keys []string) *ChunkTags {
134153
var m influx.PointTags
135154
var ss []string
136155
for _, kv := range ct.decodeTags() {
@@ -172,10 +191,10 @@ func (ct *ChunkTags) PointTags() influx.PointTags {
172191
func ContainDim(des []string, src string) bool {
173192
for i := range des {
174193
if src == des[i] {
175-
return false
194+
return true
176195
}
177196
}
178-
return true
197+
return false
179198
}
180199

181200
func (ct *ChunkTags) encodeTagsByTagKVs(keys []string, vals []string) {
@@ -239,6 +258,38 @@ func (ct *ChunkTags) encodeTags(pts influx.PointTags, keys []string) {
239258
ct.subset = append(head, ct.subset...)
240259
}
241260

261+
func (ct *ChunkTags) encodeTagsWithoutDims(pts influx.PointTags, withoutKeys []string) {
262+
ct.offsets = make([]uint16, 0, (len(pts)-len(withoutKeys))*2)
263+
if len(pts) == 0 {
264+
return
265+
}
266+
i, j := 0, 0
267+
for i < len(withoutKeys) && j < len(pts) {
268+
if withoutKeys[i] < pts[j].Key {
269+
i++
270+
} else if withoutKeys[i] > pts[j].Key {
271+
ct.subset = append(ct.subset, Str2bytes(pts[j].Key+influx.StringSplit)...)
272+
ct.offsets = append(ct.offsets, uint16(len(ct.subset)))
273+
ct.subset = append(ct.subset, Str2bytes(pts[j].Value+influx.StringSplit)...)
274+
ct.offsets = append(ct.offsets, uint16(len(ct.subset)))
275+
j++
276+
} else {
277+
i++
278+
j++
279+
}
280+
}
281+
for ; j < len(pts); j++ {
282+
ct.subset = append(ct.subset, Str2bytes(pts[j].Key+influx.StringSplit)...)
283+
ct.offsets = append(ct.offsets, uint16(len(ct.subset)))
284+
ct.subset = append(ct.subset, Str2bytes(pts[j].Value+influx.StringSplit)...)
285+
ct.offsets = append(ct.offsets, uint16(len(ct.subset)))
286+
}
287+
288+
ct.offsets = append([]uint16{uint16(len(ct.offsets))}, ct.offsets...)
289+
head := util.Uint16Slice2byte(ct.offsets)
290+
ct.subset = append(head, ct.subset...)
291+
}
292+
242293
func (ct *ChunkTags) decodeTags() [][]string {
243294
if len(ct.subset) == 0 {
244295
return nil

engine/executor/executor_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -642,3 +642,17 @@ func TestOneReaderExchangeExecutorBuilder(t *testing.T) {
642642
t.Error("OneShardExchangeExecutorBuilder test error")
643643
}
644644
}
645+
646+
func TestNewChunkTagsWithoutDims(t *testing.T) {
647+
pts := make(influx.PointTags, 0)
648+
pts = append(pts, influx.Tag{Key: "tk1", Value: "tv1"})
649+
pts = append(pts, influx.Tag{Key: "tk2", Value: "tv2"})
650+
pts = append(pts, influx.Tag{Key: "tk3", Value: "tv3"})
651+
withoutDims := []string{"tk0", "tk2"}
652+
ct := executor.NewChunkTagsWithoutDims(pts, withoutDims)
653+
k, v := ct.GetChunkTagAndValues()
654+
assert.Equal(t, k[0], "tk1")
655+
assert.Equal(t, v[0], "tv1")
656+
assert.Equal(t, k[1], "tk3")
657+
assert.Equal(t, v[1], "tv3")
658+
}

engine/executor/logic_plan.go

+24-2
Original file line numberDiff line numberDiff line change
@@ -1769,6 +1769,22 @@ func explainIterms(writer LogicalPlanWriter, id uint64, mstName string, dimensio
17691769
writer.Item("ID", id)
17701770
}
17711771

1772+
func explainItermsReader(writer LogicalPlanWriter, id uint64, mstName string, dimensions []string, isProm bool, without bool, groupByAll bool) {
1773+
var builder strings.Builder
1774+
for i, d := range dimensions {
1775+
if i != 0 {
1776+
builder.WriteString(", ")
1777+
}
1778+
builder.WriteString(d)
1779+
}
1780+
writer.Item("dimensions", builder.String())
1781+
writer.Item("mstName", mstName)
1782+
writer.Item("ID", id)
1783+
writer.Item("isProm", isProm)
1784+
writer.Item("without", without)
1785+
writer.Item("groupByAll", groupByAll)
1786+
}
1787+
17721788
type LogicalReader struct {
17731789
cursor []interface{}
17741790
hasPreAgg bool
@@ -1849,7 +1865,7 @@ func (p *LogicalReader) SetCursor(cursor []interface{}) {
18491865
}
18501866

18511867
func (p *LogicalReader) ExplainIterms(writer LogicalPlanWriter) {
1852-
explainIterms(writer, p.id, p.mstName, p.dimensions)
1868+
explainItermsReader(writer, p.id, p.mstName, p.dimensions, p.schema.Options().IsPromQuery(), p.schema.Options().IsWithout(), p.schema.Options().IsGroupByAllDims())
18531869
}
18541870

18551871
func (p *LogicalReader) Explain(writer LogicalPlanWriter) {
@@ -2188,6 +2204,9 @@ func (p *LogicalGroupBy) ExplainIterms(writer LogicalPlanWriter) {
21882204
builder.WriteString(d)
21892205
}
21902206
writer.Item("dimensions", builder.String())
2207+
writer.Item("isProm", p.schema.Options().IsPromQuery())
2208+
writer.Item("without", p.schema.Options().IsWithout())
2209+
writer.Item("groupByAll", p.schema.Options().IsGroupByAllDims())
21912210
}
21922211

21932212
func (p *LogicalGroupBy) Explain(writer LogicalPlanWriter) {
@@ -2255,6 +2274,9 @@ func (p *LogicalOrderBy) ExplainIterms(writer LogicalPlanWriter) {
22552274
builder.WriteString(d)
22562275
}
22572276
writer.Item("dimensions", builder.String())
2277+
writer.Item("isProm", p.schema.Options().IsPromQuery())
2278+
writer.Item("without", p.schema.Options().IsWithout())
2279+
writer.Item("groupByAll", p.schema.Options().IsGroupByAllDims())
22582280
}
22592281

22602282
func (p *LogicalOrderBy) Explain(writer LogicalPlanWriter) {
@@ -2844,7 +2866,7 @@ func (b *LogicalPlanBuilderImpl) CreateSegmentPlan(schema hybridqp.Catalog) (hyb
28442866
if schema.HasCall() && schema.CanAggPushDown() {
28452867
b.Exchange(READER_EXCHANGE, nil)
28462868
}
2847-
if len(schema.Options().GetDimensions()) > 0 && (!schema.HasCall() || schema.HasCall() && !schema.CanAggPushDown()) {
2869+
if (len(schema.Options().GetDimensions()) > 0 || schema.Options().IsPromGroupAllOrWithout()) && (!schema.HasCall() || schema.HasCall() && !schema.CanAggPushDown()) {
28482870
b.Exchange(READER_EXCHANGE, nil)
28492871
}
28502872
b.Exchange(SEGMENT_EXCHANGE, nil)

engine/executor/logic_plan_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -1004,3 +1004,17 @@ func Test_BuildFullJoinQueryPlant(t *testing.T) {
10041004
t.Fatal("TestBuildFullJoinQueryPlan error")
10051005
}
10061006
}
1007+
1008+
func Test_ExplainNode(t *testing.T) {
1009+
planWriter := executor.NewLogicalPlanWriterImpl(&strings.Builder{})
1010+
opt := query.ProcessorOptions{}
1011+
fields := []*influxql.Field{
1012+
{Expr: &influxql.VarRef{Val: "m1.f1", Type: influxql.Float, Alias: ""}, Alias: ""},
1013+
}
1014+
clonames := []string{"f1"}
1015+
schema := executor.NewQuerySchema(fields, clonames, &opt, nil)
1016+
logicSeries := executor.NewLogicalSeries(schema)
1017+
groupBy := executor.NewLogicalGroupBy(logicSeries, schema)
1018+
orderBy := executor.NewLogicalOrderBy(groupBy, schema)
1019+
orderBy.Explain(planWriter)
1020+
}

engine/executor/orderby_transform.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,20 @@ func (trans *OrderByTransform) closeChunkChannel() {
186186
close(trans.currChunk)
187187
}
188188

189+
func (trans *OrderByTransform) GetCurrTags(i int) string {
190+
return string(trans.currTags[i].subset)
191+
}
192+
189193
func (trans *OrderByTransform) GetTagAndIndexes(chunk Chunk) {
194+
var t *ChunkTags
190195
for i := range chunk.Tags() {
191-
t := chunk.Tags()[i].KeepKeys(trans.dimensions)
196+
if trans.opt.IsPromGroupAll() {
197+
t = &chunk.Tags()[i]
198+
} else if trans.opt.IsWithout() {
199+
t = chunk.Tags()[i].RemoveKeys(trans.opt.Dimensions)
200+
} else {
201+
t = chunk.Tags()[i].KeepKeys(trans.dimensions)
202+
}
192203
index := chunk.TagIndex()
193204
if i == 0 || !bytes.Equal(t.Subset(trans.dimensions), trans.currTags[len(trans.currTags)-1].Subset(trans.dimensions)) {
194205
trans.currTags = append(trans.currTags, *t)
@@ -199,8 +210,15 @@ func (trans *OrderByTransform) GetTagAndIndexes(chunk Chunk) {
199210

200211
func (trans *OrderByTransform) GetTagsResetTagIndexes(chunk Chunk) {
201212
trans.currTagIndex = trans.currTagIndex[:0]
213+
var t *ChunkTags
202214
for i := range chunk.Tags() {
203-
t := chunk.Tags()[i].KeepKeys(trans.dimensions)
215+
if trans.opt.IsPromGroupAll() {
216+
t = &chunk.Tags()[i]
217+
} else if trans.opt.IsWithout() {
218+
t = chunk.Tags()[i].RemoveKeys(trans.opt.Dimensions)
219+
} else {
220+
t = chunk.Tags()[i].KeepKeys(trans.dimensions)
221+
}
204222
index := chunk.TagIndex()
205223
if len(trans.currTags) == 0 || !bytes.Equal(t.Subset(trans.dimensions), trans.currTags[len(trans.currTags)-1].Subset(trans.dimensions)) {
206224
trans.currTags = append(trans.currTags, *t)
+113
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package executor_test
18+
19+
import (
20+
"testing"
21+
22+
"github.com/openGemini/openGemini/engine/executor"
23+
"github.com/openGemini/openGemini/engine/hybridqp"
24+
"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
25+
"github.com/openGemini/openGemini/lib/util/lifted/influx/query"
26+
"github.com/stretchr/testify/assert"
27+
)
28+
29+
func buildOrderByTransformRowDataType() hybridqp.RowDataType {
30+
rowDataType := hybridqp.NewRowDataTypeImpl(
31+
influxql.VarRef{Val: "value", Type: influxql.Float},
32+
)
33+
return rowDataType
34+
}
35+
36+
func buildOrderByTransformInChunk() executor.Chunk {
37+
rowDataType := buildOrderByTransformRowDataType()
38+
b := executor.NewChunkBuilder(rowDataType)
39+
chunk := b.NewChunk("tag1")
40+
chunk.AppendTimes([]int64{1, 3, 4})
41+
chunk.AddTagAndIndex(*ParseChunkTags("tag1=tag1val,tag2=tag2val"), 0)
42+
chunk.AddIntervalIndex(0)
43+
AppendFloatValues(chunk, 0, []float64{1.1, 3.3, 4.4}, []bool{true, true, true})
44+
return chunk
45+
}
46+
47+
func TestOrderByTransformInitPromDims1(t *testing.T) {
48+
inRowDataType := buildOrderByTransformRowDataType()
49+
outRowDataType := inRowDataType
50+
exprOpt := []hybridqp.ExprOptions{
51+
{
52+
Expr: &influxql.VarRef{Val: "val0", Type: influxql.Float},
53+
Ref: influxql.VarRef{Val: "val0", Type: influxql.Float},
54+
},
55+
}
56+
chunk := buildOrderByTransformInChunk()
57+
opt1 := query.ProcessorOptions{
58+
PromQuery: true,
59+
GroupByAllDims: true,
60+
}
61+
trans1 := executor.NewOrderByTransform(inRowDataType, outRowDataType, exprOpt, &opt1, nil)
62+
trans1.GetTagAndIndexes(chunk)
63+
assert.Equal(t, trans1.GetCurrTags(0), "\x04\x00\x05\x00\r\x00\x12\x00\x1a\x00tag1\x00tag1val\x00tag2\x00tag2val\x00")
64+
opt2 := query.ProcessorOptions{
65+
PromQuery: true,
66+
Without: true,
67+
Dimensions: []string{"tag1"},
68+
}
69+
trans2 := executor.NewOrderByTransform(inRowDataType, outRowDataType, exprOpt, &opt2, nil)
70+
trans2.GetTagAndIndexes(chunk)
71+
assert.Equal(t, trans2.GetCurrTags(0), "\x02\x00\x05\x00\r\x00tag2\x00tag2val\x00")
72+
opt3 := query.ProcessorOptions{
73+
PromQuery: true,
74+
Dimensions: []string{"tag1"},
75+
}
76+
trans3 := executor.NewOrderByTransform(inRowDataType, outRowDataType, exprOpt, &opt3, opt3.Dimensions)
77+
trans3.GetTagAndIndexes(chunk)
78+
assert.Equal(t, trans3.GetCurrTags(0), "\x02\x00\x05\x00\r\x00tag1\x00tag1val\x00")
79+
}
80+
81+
func TestOrderByTransformInitPromDims2(t *testing.T) {
82+
inRowDataType := buildOrderByTransformRowDataType()
83+
outRowDataType := inRowDataType
84+
exprOpt := []hybridqp.ExprOptions{
85+
{
86+
Expr: &influxql.VarRef{Val: "val0", Type: influxql.Float},
87+
Ref: influxql.VarRef{Val: "val0", Type: influxql.Float},
88+
},
89+
}
90+
chunk := buildOrderByTransformInChunk()
91+
opt1 := query.ProcessorOptions{
92+
PromQuery: true,
93+
GroupByAllDims: true,
94+
}
95+
trans1 := executor.NewOrderByTransform(inRowDataType, outRowDataType, exprOpt, &opt1, nil)
96+
trans1.GetTagsResetTagIndexes(chunk)
97+
assert.Equal(t, trans1.GetCurrTags(0), "\x04\x00\x05\x00\r\x00\x12\x00\x1a\x00tag1\x00tag1val\x00tag2\x00tag2val\x00")
98+
opt2 := query.ProcessorOptions{
99+
PromQuery: true,
100+
Without: true,
101+
Dimensions: []string{"tag1"},
102+
}
103+
trans2 := executor.NewOrderByTransform(inRowDataType, outRowDataType, exprOpt, &opt2, nil)
104+
trans2.GetTagsResetTagIndexes(chunk)
105+
assert.Equal(t, trans2.GetCurrTags(0), "\x02\x00\x05\x00\r\x00tag2\x00tag2val\x00")
106+
opt3 := query.ProcessorOptions{
107+
PromQuery: true,
108+
Dimensions: []string{"tag1"},
109+
}
110+
trans3 := executor.NewOrderByTransform(inRowDataType, outRowDataType, exprOpt, &opt3, opt3.Dimensions)
111+
trans3.GetTagsResetTagIndexes(chunk)
112+
assert.Equal(t, trans3.GetCurrTags(0), "\x02\x00\x05\x00\r\x00tag1\x00tag1val\x00")
113+
}

engine/executor/subquery.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ func (b *SubQueryBuilder) newSubOptions(ctx context.Context, opt query.Processor
3838
Chunked: opt.Chunked,
3939
ChunkSize: opt.ChunkSize,
4040
RowsChan: opt.RowsChan,
41-
IsPromQuery: opt.PromQuery,
4241
})
4342

4443
if err != nil {
@@ -57,10 +56,15 @@ func (b *SubQueryBuilder) newSubOptions(ctx context.Context, opt query.Processor
5756
}
5857
}
5958

60-
pushDownDimension := GetInnerDimensions(opt.Dimensions, subOpt.Dimensions)
61-
subOpt.Dimensions = pushDownDimension
62-
for d := range opt.GroupBy {
63-
subOpt.GroupBy[d] = struct{}{}
59+
if !opt.Without {
60+
pushDownDimension := GetInnerDimensions(opt.Dimensions, subOpt.Dimensions)
61+
subOpt.Dimensions = pushDownDimension
62+
for d := range opt.GroupBy {
63+
subOpt.GroupBy[d] = struct{}{}
64+
}
65+
if opt.PromQuery && len(pushDownDimension) > 0 {
66+
subOpt.GroupByAllDims = opt.GroupByAllDims
67+
}
6468
}
6569

6670
valuer := &influxql.NowValuer{Location: b.stmt.Location}
@@ -85,6 +89,7 @@ func (b *SubQueryBuilder) newSubOptions(ctx context.Context, opt query.Processor
8589
subOpt.Fill = influxql.NoFill
8690
}
8791
subOpt.PromQuery = opt.PromQuery
92+
subOpt.Without = b.stmt.Without
8893
subOpt.Ordered = opt.Ordered
8994
subOpt.HintType = opt.HintType
9095
subOpt.StmtId = opt.StmtId

0 commit comments

Comments
 (0)