Skip to content

Commit 0fa5724

Browse files
gaodayuebinmahone
gaodayue
authored andcommitted
KYLIN-2501 Stream Aggregate GTRecords at Query Server
1 parent fa3ee3f commit 0fa5724

26 files changed

+704
-210
lines changed

core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java

+4
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,10 @@ public boolean isSkippingEmptySegments() {
802802
return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true"));
803803
}
804804

805+
public boolean isStreamAggregateEnabled() {
806+
return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
807+
}
808+
805809
@Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold
806810
public int getStoragePushDownLimitMax() {
807811
return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));

core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919

2020
import java.nio.ByteBuffer;
2121
import java.util.BitSet;
22+
import java.util.Iterator;
2223

23-
public class ImmutableBitSet {
24+
public class ImmutableBitSet implements Iterable<Integer> {
2425

2526
public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet());
2627

@@ -168,4 +169,30 @@ public ImmutableBitSet deserialize(ByteBuffer in) {
168169
return new ImmutableBitSet(bitSet);
169170
}
170171
};
172+
173+
/**
174+
* Iterate over the positions of true value.
175+
* @return the iterator
176+
*/
177+
@Override
178+
public Iterator<Integer> iterator() {
179+
return new Iterator<Integer>() {
180+
int index = 0;
181+
182+
@Override
183+
public boolean hasNext() {
184+
return index < arr.length;
185+
}
186+
187+
@Override
188+
public Integer next() {
189+
return arr[index++];
190+
}
191+
192+
@Override
193+
public void remove() {
194+
throw new UnsupportedOperationException();
195+
}
196+
};
197+
}
171198
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.kylin;
20+
21+
import org.apache.kylin.gridtable.GTInfo;
22+
import org.apache.kylin.gridtable.GTRecord;
23+
import org.apache.kylin.gridtable.IGTScanner;
24+
25+
import java.io.IOException;
26+
import java.util.Iterator;
27+
28+
import static com.google.common.base.Preconditions.checkNotNull;
29+
30+
/**
31+
* A {@link IGTScanner} which forwards all its method calls to another scanner.
32+
*
33+
* @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
34+
*/
35+
public class GTForwardingScanner implements IGTScanner {
36+
protected IGTScanner delegated;
37+
38+
protected GTForwardingScanner(IGTScanner delegated) {
39+
this.delegated = checkNotNull(delegated, "delegated");
40+
}
41+
42+
@Override
43+
public GTInfo getInfo() {
44+
return delegated.getInfo();
45+
}
46+
47+
@Override
48+
public void close() throws IOException {
49+
delegated.close();
50+
}
51+
52+
@Override
53+
public Iterator<GTRecord> iterator() {
54+
return delegated.iterator();
55+
}
56+
}

core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java

-18
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,11 @@
1818

1919
package org.apache.kylin.cube.gridtable;
2020

21-
import java.util.Map;
22-
23-
import org.apache.kylin.common.util.Dictionary;
24-
import org.apache.kylin.cube.CubeSegment;
2521
import org.apache.kylin.cube.cuboid.Cuboid;
26-
import org.apache.kylin.cube.kv.CubeDimEncMap;
27-
import org.apache.kylin.cube.model.CubeDesc;
2822
import org.apache.kylin.dimension.IDimensionEncodingMap;
2923
import org.apache.kylin.gridtable.GTInfo;
30-
import org.apache.kylin.metadata.model.TblColRef;
3124

3225
public class CubeGridTable {
33-
34-
public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
35-
Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId);
36-
return newGTInfo(cuboid, new CubeDimEncMap(cubeSeg));
37-
}
38-
39-
public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) {
40-
Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
41-
return newGTInfo(cuboid, new CubeDimEncMap(cubeDesc, dictionaryMap));
42-
}
43-
4426
public static GTInfo newGTInfo(Cuboid cuboid, IDimensionEncodingMap dimEncMap) {
4527
CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
4628

core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java

+18
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,29 @@ public int getIndexOf(TblColRef dimension) {
140140
return i == null ? -1 : i.intValue();
141141
}
142142

143+
public int[] getDimIndexes(Collection<TblColRef> dims) {
144+
int[] result = new int[dims.size()];
145+
int i = 0;
146+
for (TblColRef dim : dims) {
147+
result[i++] = getIndexOf(dim);
148+
}
149+
return result;
150+
}
151+
143152
public int getIndexOf(FunctionDesc metric) {
144153
Integer r = metrics2gt.get(metric);
145154
return r == null ? -1 : r;
146155
}
147156

157+
public int[] getMetricsIndexes(Collection<FunctionDesc> metrics) {
158+
int[] result = new int[metrics.size()];
159+
int i = 0;
160+
for (FunctionDesc metric : metrics) {
161+
result[i++] = getIndexOf(metric);
162+
}
163+
return result;
164+
}
165+
148166
public List<TblColRef> getCuboidDimensionsInGTOrder() {
149167
return cuboid.getColumns();
150168
}

core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.kylin.cube.cuboid.Cuboid;
3939
import org.apache.kylin.cube.cuboid.CuboidScheduler;
4040
import org.apache.kylin.cube.gridtable.CubeGridTable;
41+
import org.apache.kylin.cube.kv.CubeDimEncMap;
4142
import org.apache.kylin.cube.model.CubeDesc;
4243
import org.apache.kylin.gridtable.GTAggregateScanner;
4344
import org.apache.kylin.gridtable.GTBuilder;
@@ -108,7 +109,10 @@ public InMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<Tb
108109
}
109110

110111
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
111-
GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
112+
GTInfo info = CubeGridTable.newGTInfo(
113+
Cuboid.findById(cubeDesc, cuboidID),
114+
new CubeDimEncMap(cubeDesc, dictionaryMap)
115+
);
112116

113117
// Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
114118
// MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);

core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.apache.kylin.measure.BufferedMeasureCodec;
4646
import org.apache.kylin.measure.MeasureAggregator;
4747
import org.apache.kylin.measure.MeasureAggregators;
48-
import org.apache.kylin.metadata.datatype.DataType;
4948
import org.slf4j.Logger;
5049
import org.slf4j.LoggerFactory;
5150

@@ -63,6 +62,7 @@ public class GTAggregateScanner implements IGTScanner {
6362
final ImmutableBitSet metrics;
6463
final String[] metricsAggrFuncs;
6564
final IGTScanner inputScanner;
65+
final BufferedMeasureCodec measureCodec;
6666
final AggregationCache aggrCache;
6767
final long spillThreshold; // 0 means no memory control && no spill
6868
final int storagePushDownLimit;//default to be Int.MAX
@@ -86,6 +86,7 @@ public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean sp
8686
this.metrics = req.getAggrMetrics();
8787
this.metricsAggrFuncs = req.getAggrMetricsFuncs();
8888
this.inputScanner = inputScanner;
89+
this.measureCodec = req.createMeasureCodec();
8990
this.aggrCache = new AggregationCache();
9091
this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
9192
this.aggrMask = new boolean[metricsAggrFuncs.length];
@@ -175,7 +176,6 @@ class AggregationCache implements Closeable {
175176
final int keyLength;
176177
final boolean[] compareMask;
177178
boolean compareAll = true;
178-
final BufferedMeasureCodec measureCodec;
179179

180180
final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
181181
@Override
@@ -213,18 +213,6 @@ public AggregationCache() {
213213
keyLength = compareMask.length;
214214
dumps = Lists.newArrayList();
215215
aggBufMap = createBuffMap();
216-
measureCodec = createMeasureCodec();
217-
}
218-
219-
private BufferedMeasureCodec createMeasureCodec() {
220-
DataType[] types = new DataType[metrics.trueBitCount()];
221-
for (int i = 0; i < types.length; i++) {
222-
types[i] = info.getColumnType(metrics.trueBitAt(i));
223-
}
224-
225-
BufferedMeasureCodec result = new BufferedMeasureCodec(types);
226-
result.setBufferSize(info.getMaxColumnLength(metrics));
227-
return result;
228216
}
229217

230218
private boolean[] createCompareMask() {

core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java

+6-16
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.NoSuchElementException;
2626
import java.util.Set;
2727

28+
import org.apache.kylin.GTForwardingScanner;
2829
import org.apache.kylin.common.util.ByteArray;
2930
import org.apache.kylin.common.util.BytesUtil;
3031
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -33,17 +34,16 @@
3334
import org.apache.kylin.metadata.model.TblColRef;
3435
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
3536

36-
public class GTFilterScanner implements IGTScanner {
37+
public class GTFilterScanner extends GTForwardingScanner {
3738

38-
final private IGTScanner inputScanner;
3939
final private TupleFilter filter;
4040
final private IFilterCodeSystem<ByteArray> filterCodeSystem;
4141
final private IEvaluatableTuple oneTuple; // avoid instance creation
4242

4343
private GTRecord next = null;
4444

45-
public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
46-
this.inputScanner = inputScanner;
45+
public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException {
46+
super(delegated);
4747
this.filter = req.getFilterPushDown();
4848
this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
4949
this.oneTuple = new IEvaluatableTuple() {
@@ -53,25 +53,15 @@ public Object getValue(TblColRef col) {
5353
}
5454
};
5555

56-
if (TupleFilter.isEvaluableRecursively(filter) == false)
56+
if (!TupleFilter.isEvaluableRecursively(filter))
5757
throw new IllegalArgumentException();
5858
}
5959

60-
@Override
61-
public GTInfo getInfo() {
62-
return inputScanner.getInfo();
63-
}
64-
65-
@Override
66-
public void close() throws IOException {
67-
inputScanner.close();
68-
}
69-
7060
@Override
7161
public Iterator<GTRecord> iterator() {
7262
return new Iterator<GTRecord>() {
7363

74-
private Iterator<GTRecord> inputIterator = inputScanner.iterator();
64+
private Iterator<GTRecord> inputIterator = delegated.iterator();
7565
private FilterResultCache resultCache = new FilterResultCache(getInfo(), filter);
7666

7767
@Override

0 commit comments

Comments
 (0)