|
18 | 18 |
|
19 | 19 | package org.apache.kylin.storage.gtrecord;
|
20 | 20 |
|
| 21 | +import com.google.common.base.Function; |
21 | 22 | import com.google.common.collect.Iterators;
|
22 | 23 | import com.google.common.collect.Lists;
|
23 | 24 | import org.apache.kylin.common.util.ImmutableBitSet;
|
@@ -69,21 +70,22 @@ public void close() throws IOException {
|
69 | 70 |
|
70 | 71 | @Override
|
71 | 72 | public Iterator<GTRecord> iterator() {
|
72 |
| - List<PartitionResultIterator> partitionResults = Lists.newArrayList(); |
73 |
| - while (blocks.hasNext()) { |
74 |
| - partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns)); |
| 73 | + Iterator<PartitionResultIterator> iterators = Iterators.transform(blocks, new Function<byte[], PartitionResultIterator>() { |
| 74 | + public PartitionResultIterator apply(byte[] input) { |
| 75 | + return new PartitionResultIterator(input, info, columns); |
| 76 | + } |
| 77 | + }); |
| 78 | + |
| 79 | + if (!needSorted) { |
| 80 | + logger.debug("Using Iterators.concat to pipeline partition results"); |
| 81 | + return Iterators.concat(iterators); |
75 | 82 | }
|
76 | 83 |
|
| 84 | + List<PartitionResultIterator> partitionResults = Lists.newArrayList(iterators); |
77 | 85 | if (partitionResults.size() == 1) {
|
78 | 86 | return partitionResults.get(0);
|
79 | 87 | }
|
80 |
| - |
81 |
| - if (!needSorted) { |
82 |
| - logger.debug("Using Iterators.concat to merge partition results"); |
83 |
| - return Iterators.concat(partitionResults.iterator()); |
84 |
| - } |
85 |
| - |
86 |
| - logger.debug("Using SortMergedPartitionResultIterator to merge partition results"); |
| 88 | + logger.debug("Using SortMergedPartitionResultIterator to merge {} partition results", partitionResults.size()); |
87 | 89 | return new SortMergedPartitionResultIterator(partitionResults, info, GTRecord.getComparator(groupByDims));
|
88 | 90 | }
|
89 | 91 | }
|
0 commit comments