Skip to content

Commit f8de637

Browse files
authored
feat: support group by except tags (openGemini#612)
Signed-off-by: xiangyu5632 <[email protected]>
1 parent 196e210 commit f8de637

File tree

7 files changed

+1324
-1221
lines changed

7 files changed

+1324
-1221
lines changed

engine/executor/httpsender_transform.go

+36-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func NewHttpChunkSender(opt *query.ProcessorOptions) *HttpChunkSender {
5959
}
6060

6161
func (w *HttpChunkSender) Write(chunk Chunk, lastChunk bool) bool {
62-
w.genRows(chunk)
62+
w.GenRows(chunk)
6363

6464
var chunkedRow models.Rows
6565
var partial bool
@@ -115,26 +115,53 @@ func (w *HttpChunkSender) Write(chunk Chunk, lastChunk bool) bool {
115115
return partial
116116
}
117117

118-
func (w *HttpChunkSender) genRows(chunk Chunk) {
118+
func (w *HttpChunkSender) GenRows(chunk Chunk) {
119119
if chunk == nil {
120120
return
121121
}
122122

123123
statistics.ExecutorStat.SinkRows.Push(int64(chunk.NumberOfRows()))
124124
rows := w.rowsGenerator.Generate(chunk, w.opt.Location)
125+
if w.opt.Except {
126+
for i := 0; i < len(rows); i++ {
127+
rows[i].Values = removeDuplicationValues(rows[i].Values)
128+
}
129+
}
125130

126131
// May next Chunk has the same tag as this buffRow
127132
if rows.Len() > 0 && w.buffRows.Len() > 0 {
128133
firstRow := rows[0]
129134
lastRow := w.buffRows[len(w.buffRows)-1]
130135
if lastRow.Name == firstRow.Name && hybridqp.EqualMap(lastRow.Tags, firstRow.Tags) {
131136
lastRow.Values = append(lastRow.Values, firstRow.Values...)
137+
if w.opt.Except {
138+
lastRow.Values = removeDuplicationValues(lastRow.Values)
139+
}
132140
rows = rows[1:]
133141
}
134142
}
135143
w.buffRows = append(w.buffRows, rows...)
136144
}
137145

146+
func removeDuplicationValues(values [][]interface{}) [][]interface{} {
147+
length := len(values)
148+
if length == 0 {
149+
return values
150+
}
151+
152+
j := 0
153+
for i := 1; i < length; i++ {
154+
if values[i][0] != values[j][0] {
155+
j++
156+
if j < i {
157+
values[i], values[j] = values[j], values[i]
158+
}
159+
}
160+
}
161+
162+
return values[:j+1]
163+
}
164+
138165
// GetRows transfer Chunk to models.Rows
139166
func (w *HttpChunkSender) GetRows(chunk Chunk) models.Rows {
140167
if chunk == nil {
@@ -370,6 +397,7 @@ func (g *RowsGenerator) Generate(chunk Chunk, loc *time.Location) models.Rows {
370397
tmpRows := g.allocRows(len(tagIndex))
371398
var start, end int
372399

400+
var lastRow *models.Row
373401
for index := 0; index < len(tagIndex); index++ {
374402
start = tagIndex[index]
375403
if start == tagIndex[len(tagIndex)-1] {
@@ -383,7 +411,12 @@ func (g *RowsGenerator) Generate(chunk Chunk, loc *time.Location) models.Rows {
383411
row.Tags = chunkTags[index].KeyValues()
384412
row.Columns = columnNames
385413
row.Values = g.buildValues(end, start, times, loc, columns)
386-
rows = append(rows, row)
414+
if lastRow != nil && hybridqp.EqualMap(lastRow.Tags, row.Tags) {
415+
lastRow.Values = append(lastRow.Values, row.Values...)
416+
} else {
417+
rows = append(rows, row)
418+
lastRow = row
419+
}
387420
}
388421
return rows
389422
}

engine/executor/httpsender_transform_test.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,19 @@ func (trans *MockGenDataTransform) GetInputNumber(_ executor.Port) int {
317317
return 0
318318
}
319319

320+
func TestGenRows(t *testing.T) {
321+
sender := executor.NewHttpChunkSender(&query.ProcessorOptions{
322+
Except: true,
323+
})
324+
325+
fields := mockFieldsAndTags()
326+
refs := varRefsFromFields(fields)
327+
inRowDataType := hybridqp.NewRowDataTypeImpl(refs...)
328+
chunk := genChunk(inRowDataType)
329+
330+
sender.GenRows(chunk)
331+
}
332+
320333
func TestGetRows(t *testing.T) {
321334
fields := mockFieldsAndTags()
322335
refs := varRefsFromFields(fields)
@@ -328,17 +341,18 @@ func TestGetRows(t *testing.T) {
328341

329342
g := &executor.RowsGenerator{}
330343
other := g.Generate(chunk, time.UTC)
331-
require.Equal(t, rows.Len(), len(other))
344+
require.GreaterOrEqual(t, rows.Len(), len(other))
332345

346+
valueIndex := 0
333347
for i := 0; i < rows.Len(); i++ {
334348
exp := rows[i]
335-
got := other[i]
349+
got := other[0]
336350
require.Equal(t, exp.Tags, got.Tags)
337351
require.Equal(t, exp.Columns, got.Columns)
338-
require.Equal(t, len(exp.Values), len(got.Values))
339352

340353
for j := 0; j < len(exp.Values); j++ {
341-
require.Equal(t, exp.Values[j], got.Values[j])
354+
require.Equal(t, exp.Values[j], got.Values[valueIndex])
355+
valueIndex++
342356
}
343357
}
344358
}

lib/util/lifted/influx/influxql/ast.go

+2
Original file line numberDiff line numberDiff line change
@@ -1434,6 +1434,8 @@ type SelectStatement struct {
14341434
// Expressions used for grouping the selection.
14351435
Dimensions Dimensions
14361436

1437+
ExceptDimensions Dimensions
1438+
14371439
// Whether to drop the given labels rather than keep them.
14381440
Without bool
14391441

lib/util/lifted/influx/influxql/sql.y

+39-26
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func deal_Fill (fill interface{}) (FillOption , interface{},bool) {
114114
QUERY PARTITION
115115
TOKEN TOKENIZERS MATCH LIKE MATCHPHRASE CONFIG CONFIGS CLUSTER
116116
REPLICAS DETAIL DESTINATIONS
117-
SCHEMA INDEXES AUTO
117+
SCHEMA INDEXES AUTO EXCEPT
118118
%token <bool> DESC ASC
119119
%token <str> COMMA SEMICOLON LPAREN RPAREN REGEX
120120
%token <int> EQ NEQ LT LTE GT GTE DOT DOUBLECOLON NEQREGEX EQREGEX
@@ -159,7 +159,7 @@ func deal_Fill (fill interface{}) (FillOption , interface{},bool) {
159159
%type <dataType> COLUMN_VAREF_TYPE
160160
%type <sortfs> SORTFIELDS ORDER_CLAUSES
161161
%type <sortf> SORTFIELD
162-
%type <dimens> GROUP_BY_CLAUSE DIMENSION_NAMES
162+
%type <dimens> GROUP_BY_CLAUSE EXCEPT_CLAUSE DIMENSION_NAMES
163163
%type <dimen> DIMENSION_NAME
164164
%type <intSlice> OPTION_CLAUSES LIMIT_OFFSET_OPTION SLIMIT_SOFFSET_OPTION
165165
%type <inter> FILL_CLAUSE FILLCONTENT
@@ -428,20 +428,21 @@ STATEMENT:
428428
}
429429

430430
SELECT_STATEMENT:
431-
SELECT COLUMN_CLAUSES INTO_CLAUSE FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE FILL_CLAUSE ORDER_CLAUSES OPTION_CLAUSES TIME_ZONE
431+
SELECT COLUMN_CLAUSES INTO_CLAUSE FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE EXCEPT_CLAUSE FILL_CLAUSE ORDER_CLAUSES OPTION_CLAUSES TIME_ZONE
432432
{
433433
stmt := &SelectStatement{}
434434
stmt.Fields = $2
435435
stmt.Sources = $4
436436
stmt.Dimensions = $6
437+
stmt.ExceptDimensions = $7
437438
stmt.Condition = $5
438-
stmt.SortFields = $8
439-
stmt.Limit = $9[0]
440-
stmt.Offset = $9[1]
441-
stmt.SLimit = $9[2]
442-
stmt.SOffset = $9[3]
439+
stmt.SortFields = $9
440+
stmt.Limit = $10[0]
441+
stmt.Offset = $10[1]
442+
stmt.SLimit = $10[2]
443+
stmt.SOffset = $10[3]
443444

444-
tempfill,tempfillvalue,fillflag := deal_Fill($7)
445+
tempfill,tempfillvalue,fillflag := deal_Fill($8)
445446
if fillflag==false{
446447
yylex.Error("Invalid characters in fill")
447448
}else{
@@ -453,7 +454,7 @@ SELECT_STATEMENT:
453454
stmt.IsRawQuery = false
454455
}
455456
})
456-
stmt.Location = $10
457+
stmt.Location = $11
457458
if len($3) > 1{
458459
yylex.Error("into clause only support one measurement")
459460
}else if len($3) == 1{
@@ -468,21 +469,22 @@ SELECT_STATEMENT:
468469
}
469470
$$ = stmt
470471
}
471-
|SELECT HINT COLUMN_CLAUSES INTO_CLAUSE FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE FILL_CLAUSE ORDER_CLAUSES OPTION_CLAUSES TIME_ZONE
472+
|SELECT HINT COLUMN_CLAUSES INTO_CLAUSE FROM_CLAUSE WHERE_CLAUSE GROUP_BY_CLAUSE EXCEPT_CLAUSE FILL_CLAUSE ORDER_CLAUSES OPTION_CLAUSES TIME_ZONE
472473
{
473474
stmt := &SelectStatement{}
474475
stmt.Hints = $2
475476
stmt.Fields = $3
476477
stmt.Sources = $5
477478
stmt.Dimensions = $7
479+
stmt.ExceptDimensions = $8
478480
stmt.Condition = $6
479-
stmt.SortFields = $9
480-
stmt.Limit = $10[0]
481-
stmt.Offset = $10[1]
482-
stmt.SLimit = $10[2]
483-
stmt.SOffset = $10[3]
481+
stmt.SortFields = $10
482+
stmt.Limit = $11[0]
483+
stmt.Offset = $11[1]
484+
stmt.SLimit = $11[2]
485+
stmt.SOffset = $11[3]
484486

485-
tempfill,tempfillvalue,fillflag := deal_Fill($8)
487+
tempfill,tempfillvalue,fillflag := deal_Fill($9)
486488
if fillflag==false{
487489
yylex.Error("Invalid characters in fill")
488490
}else{
@@ -494,7 +496,7 @@ SELECT_STATEMENT:
494496
stmt.IsRawQuery = false
495497
}
496498
})
497-
stmt.Location = $11
499+
stmt.Location = $12
498500
if len($4) > 1{
499501
yylex.Error("into clause only support one measurement")
500502
}else if len($4) == 1{
@@ -509,19 +511,20 @@ SELECT_STATEMENT:
509511
}
510512
$$ = stmt
511513
}
512-
|SELECT COLUMN_CLAUSES WHERE_CLAUSE GROUP_BY_CLAUSE FILL_CLAUSE ORDER_CLAUSES OPTION_CLAUSES TIME_ZONE
514+
|SELECT COLUMN_CLAUSES WHERE_CLAUSE GROUP_BY_CLAUSE EXCEPT_CLAUSE FILL_CLAUSE ORDER_CLAUSES OPTION_CLAUSES TIME_ZONE
513515
{
514516
stmt := &SelectStatement{}
515517
stmt.Fields = $2
516518
stmt.Dimensions = $4
519+
stmt.ExceptDimensions = $5
517520
stmt.Condition = $3
518-
stmt.SortFields = $6
519-
stmt.Limit = $7[0]
520-
stmt.Offset = $7[1]
521-
stmt.SLimit = $7[2]
522-
stmt.SOffset = $7[3]
521+
stmt.SortFields = $7
522+
stmt.Limit = $8[0]
523+
stmt.Offset = $8[1]
524+
stmt.SLimit = $8[2]
525+
stmt.SOffset = $8[3]
523526

524-
tempfill,tempfillvalue,fillflag := deal_Fill($5)
527+
tempfill,tempfillvalue,fillflag := deal_Fill($6)
525528
if fillflag==false{
526529
yylex.Error("Invalid characters in fill")
527530
}else{
@@ -533,7 +536,7 @@ SELECT_STATEMENT:
533536
stmt.IsRawQuery = false
534537
}
535538
})
536-
stmt.Location = $8
539+
stmt.Location = $9
537540
$$ = stmt
538541
}
539542

@@ -876,6 +879,16 @@ GROUP_BY_CLAUSE:
876879
$$ = nil
877880
}
878881

882+
EXCEPT_CLAUSE:
883+
EXCEPT DIMENSION_NAMES
884+
{
885+
$$ = $2
886+
}
887+
|
888+
{
889+
$$ = nil
890+
}
891+
879892
DIMENSION_NAMES:
880893
DIMENSION_NAME
881894
{

lib/util/lifted/influx/influxql/token.go

+1
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ var tokens = [...]string{
338338
DETAIL: "DETAIL",
339339
COMPACT: "COMPACT",
340340
AUTO: "AUTO",
341+
EXCEPT: "EXCEPT",
341342
}
342343

343344
var keywords map[string]int

0 commit comments

Comments
 (0)