Skip to content

Commit d649e91

Browse files
author
gsheffi
committed
For the PR
1 parent 743547f commit d649e91

32 files changed

+4221
-1207
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark.indexing;
21+
22+
import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
23+
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
24+
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
25+
import org.apache.druid.data.input.InputRow;
26+
import org.apache.druid.hll.HyperLogLogHash;
27+
import org.apache.druid.java.util.common.logger.Logger;
28+
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
29+
import org.apache.druid.segment.incremental.IncrementalIndex;
30+
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
31+
import org.apache.druid.segment.serde.ComplexMetrics;
32+
import org.openjdk.jmh.annotations.Benchmark;
33+
import org.openjdk.jmh.annotations.Threads;
34+
import org.openjdk.jmh.annotations.BenchmarkMode;
35+
import org.openjdk.jmh.annotations.Measurement;
36+
import org.openjdk.jmh.annotations.Mode;
37+
import org.openjdk.jmh.annotations.OutputTimeUnit;
38+
import org.openjdk.jmh.annotations.Param;
39+
import org.openjdk.jmh.annotations.Scope;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.annotations.State;
42+
import org.openjdk.jmh.annotations.Warmup;
43+
import org.openjdk.jmh.annotations.Level;
44+
import org.openjdk.jmh.infra.Blackhole;
45+
46+
import java.io.IOException;
47+
import java.util.ArrayList;
48+
import java.util.concurrent.TimeUnit;
49+
50+
@State(Scope.Benchmark)
51+
@Warmup(iterations = 10)
52+
@Measurement(iterations = 25)
53+
public class OakIncrementalIndexIngestionBenchmark
54+
{
55+
@Param({"10000", "75000"})
56+
private int rowsPerSegment;
57+
58+
@Param({"basic"})
59+
private String schema;
60+
61+
@Param({"true", "false"})
62+
private boolean rollup;
63+
64+
@Param({"true", "false"})
65+
private boolean onheap;
66+
67+
private static final Logger log = new Logger(OakIncrementalIndexIngestionBenchmark.class);
68+
private static final int RNG_SEED = 9999;
69+
70+
private IncrementalIndex incIndex;
71+
private ArrayList<InputRow> rows;
72+
private BenchmarkSchemaInfo schemaInfo;
73+
74+
@Setup
75+
public void setup() throws IOException
76+
{
77+
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
78+
79+
rows = new ArrayList<InputRow>();
80+
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
81+
82+
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
83+
schemaInfo.getColumnSchemas(),
84+
RNG_SEED,
85+
schemaInfo.getDataInterval(),
86+
rowsPerSegment
87+
);
88+
89+
for (int i = 0; i < rowsPerSegment; i++) {
90+
InputRow row = gen.nextRow();
91+
if (i % 10000 == 0) {
92+
log.info(i + " rows generated.");
93+
}
94+
rows.add(row);
95+
}
96+
}
97+
98+
@Setup(Level.Iteration)
99+
public void setup2() throws IOException
100+
{
101+
incIndex = makeIncIndex();
102+
}
103+
104+
private IncrementalIndex makeIncIndex()
105+
{
106+
if (onheap) {
107+
return new IncrementalIndex.Builder()
108+
.setIndexSchema(
109+
new IncrementalIndexSchema.Builder()
110+
.withMetrics(schemaInfo.getAggsArray())
111+
.withRollup(rollup)
112+
.build()
113+
)
114+
.setReportParseExceptions(false)
115+
.setMaxRowCount(rowsPerSegment * 16)
116+
.buildOnheap();
117+
} else {
118+
return new IncrementalIndex.Builder()
119+
.setIndexSchema(
120+
new IncrementalIndexSchema.Builder()
121+
.withMetrics(schemaInfo.getAggsArray())
122+
.withRollup(rollup)
123+
.build()
124+
)
125+
.setReportParseExceptions(false)
126+
.setMaxRowCount(rowsPerSegment * 16)
127+
.buildOffheapOak();
128+
}
129+
}
130+
131+
@Benchmark
132+
@BenchmarkMode(Mode.SingleShotTime)
133+
@OutputTimeUnit(TimeUnit.SECONDS)
134+
@Threads(1)
135+
public void addRows(Blackhole blackhole) throws Exception
136+
{
137+
long time = System.currentTimeMillis();
138+
for (int i = 0; i < rowsPerSegment; i++) {
139+
InputRow row = rows.get(i);
140+
int rv = incIndex.add(row).getRowCount();
141+
blackhole.consume(rv);
142+
}
143+
long duration = System.currentTimeMillis() - time;
144+
double throughput = (10 * rowsPerSegment) / (double) duration;
145+
log.info("Throughput: " + throughput + " ops/ms");
146+
}
147+
148+
}
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark.indexing;
21+
22+
import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
23+
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
24+
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
25+
import org.apache.druid.data.input.InputRow;
26+
import org.apache.druid.hll.HyperLogLogHash;
27+
import org.apache.druid.java.util.common.granularity.Granularities;
28+
import org.apache.druid.java.util.common.guava.Sequence;
29+
import org.apache.druid.java.util.common.logger.Logger;
30+
import org.apache.druid.js.JavaScriptConfig;
31+
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
32+
import org.apache.druid.query.dimension.DefaultDimensionSpec;
33+
import org.apache.druid.query.filter.BoundDimFilter;
34+
import org.apache.druid.query.filter.DimFilter;
35+
import org.apache.druid.query.filter.InDimFilter;
36+
import org.apache.druid.query.filter.JavaScriptDimFilter;
37+
import org.apache.druid.query.filter.OrDimFilter;
38+
import org.apache.druid.query.filter.RegexDimFilter;
39+
import org.apache.druid.query.filter.SearchQueryDimFilter;
40+
import org.apache.druid.query.ordering.StringComparators;
41+
import org.apache.druid.query.search.ContainsSearchQuerySpec;
42+
import org.apache.druid.segment.Cursor;
43+
import org.apache.druid.segment.DimensionSelector;
44+
import org.apache.druid.segment.VirtualColumns;
45+
import org.apache.druid.segment.data.IndexedInts;
46+
import org.apache.druid.segment.incremental.IncrementalIndex;
47+
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
48+
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
49+
import org.apache.druid.segment.serde.ComplexMetrics;
50+
import org.openjdk.jmh.annotations.Benchmark;
51+
import org.openjdk.jmh.annotations.Threads;
52+
import org.openjdk.jmh.annotations.BenchmarkMode;
53+
import org.openjdk.jmh.annotations.Fork;
54+
import org.openjdk.jmh.annotations.Measurement;
55+
import org.openjdk.jmh.annotations.Mode;
56+
import org.openjdk.jmh.annotations.OutputTimeUnit;
57+
import org.openjdk.jmh.annotations.Param;
58+
import org.openjdk.jmh.annotations.Scope;
59+
import org.openjdk.jmh.annotations.Setup;
60+
import org.openjdk.jmh.annotations.State;
61+
import org.openjdk.jmh.annotations.Warmup;
62+
import org.openjdk.jmh.infra.Blackhole;
63+
64+
import java.io.IOException;
65+
import java.util.ArrayList;
66+
import java.util.Arrays;
67+
import java.util.Collections;
68+
import java.util.List;
69+
import java.util.concurrent.TimeUnit;
70+
71+
@State(Scope.Benchmark)
72+
@Fork(value = 1)
73+
@Warmup(iterations = 10)
74+
@Measurement(iterations = 25)
75+
public class OakIncrementalIndexReadBenchmark
76+
{
77+
@Param({"75000"})
78+
private int rowsPerSegment;
79+
80+
@Param({"basic"})
81+
private String schema;
82+
83+
@Param({"true", "false"})
84+
private boolean rollup;
85+
86+
@Param({"true", "false"})
87+
private boolean onheap;
88+
89+
private static final Logger log = new Logger(OakIncrementalIndexReadBenchmark.class);
90+
private static final int RNG_SEED = 9999;
91+
private IncrementalIndex incIndex;
92+
93+
private BenchmarkSchemaInfo schemaInfo;
94+
95+
@Setup
96+
public void setup() throws IOException
97+
{
98+
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
99+
100+
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
101+
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
102+
}
103+
104+
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
105+
106+
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
107+
schemaInfo.getColumnSchemas(),
108+
RNG_SEED,
109+
schemaInfo.getDataInterval(),
110+
rowsPerSegment
111+
);
112+
113+
incIndex = makeIncIndex();
114+
115+
for (int j = 0; j < rowsPerSegment; j++) {
116+
InputRow row = gen.nextRow();
117+
if (j % 10000 == 0) {
118+
log.info(j + " rows generated.");
119+
}
120+
incIndex.add(row);
121+
}
122+
123+
}
124+
125+
private IncrementalIndex makeIncIndex()
126+
{
127+
if (onheap) {
128+
return new IncrementalIndex.Builder()
129+
.setIndexSchema(
130+
new IncrementalIndexSchema.Builder()
131+
.withMetrics(schemaInfo.getAggsArray())
132+
.withRollup(rollup)
133+
.build()
134+
)
135+
.setReportParseExceptions(false)
136+
.setMaxRowCount(rowsPerSegment * 16)
137+
.buildOnheap();
138+
} else {
139+
return new IncrementalIndex.Builder()
140+
.setIndexSchema(
141+
new IncrementalIndexSchema.Builder()
142+
.withMetrics(schemaInfo.getAggsArray())
143+
.withRollup(rollup)
144+
.build()
145+
)
146+
.setReportParseExceptions(false)
147+
.setMaxRowCount(rowsPerSegment * 16)
148+
.buildOffheapOak();
149+
}
150+
}
151+
152+
@Benchmark
153+
@BenchmarkMode(Mode.SingleShotTime)
154+
@OutputTimeUnit(TimeUnit.SECONDS)
155+
@Threads(1)
156+
public void read(Blackhole blackhole)
157+
{
158+
long time = System.currentTimeMillis();
159+
IncrementalIndexStorageAdapter sa = new IncrementalIndexStorageAdapter(incIndex);
160+
Sequence<Cursor> cursors = makeCursors(sa, null);
161+
Cursor cursor = cursors.limit(1).toList().get(0);
162+
163+
List<DimensionSelector> selectors = new ArrayList<>();
164+
selectors.add(makeDimensionSelector(cursor, "dimSequential"));
165+
selectors.add(makeDimensionSelector(cursor, "dimZipf"));
166+
selectors.add(makeDimensionSelector(cursor, "dimUniform"));
167+
selectors.add(makeDimensionSelector(cursor, "dimSequentialHalfNull"));
168+
169+
cursor.reset();
170+
while (!cursor.isDone()) {
171+
for (DimensionSelector selector : selectors) {
172+
IndexedInts row = selector.getRow();
173+
blackhole.consume(selector.lookupName(row.get(0)));
174+
}
175+
cursor.advance();
176+
}
177+
long duration = System.currentTimeMillis() - time;
178+
double throughput = rowsPerSegment / (double) duration;
179+
log.info("Throughput: " + throughput + " ops/ms");
180+
}
181+
182+
@Benchmark
183+
@BenchmarkMode(Mode.SingleShotTime)
184+
@OutputTimeUnit(TimeUnit.SECONDS)
185+
@Threads(1)
186+
public void readWithFilters(Blackhole blackhole)
187+
{
188+
long time = System.currentTimeMillis();
189+
DimFilter filter = new OrDimFilter(
190+
Arrays.asList(
191+
new BoundDimFilter("dimSequential", "-1", "-1", true, true, null, null, StringComparators.ALPHANUMERIC),
192+
new JavaScriptDimFilter("dimSequential", "function(x) { return false }", null, JavaScriptConfig.getEnabledInstance()),
193+
new RegexDimFilter("dimSequential", "X", null),
194+
new SearchQueryDimFilter("dimSequential", new ContainsSearchQuerySpec("X", false), null),
195+
new InDimFilter("dimSequential", Collections.singletonList("X"), null)
196+
)
197+
);
198+
199+
IncrementalIndexStorageAdapter sa = new IncrementalIndexStorageAdapter(incIndex);
200+
Sequence<Cursor> cursors = makeCursors(sa, filter);
201+
Cursor cursor = cursors.limit(1).toList().get(0);
202+
203+
List<DimensionSelector> selectors = new ArrayList<>();
204+
selectors.add(makeDimensionSelector(cursor, "dimSequential"));
205+
selectors.add(makeDimensionSelector(cursor, "dimZipf"));
206+
selectors.add(makeDimensionSelector(cursor, "dimUniform"));
207+
selectors.add(makeDimensionSelector(cursor, "dimSequentialHalfNull"));
208+
209+
cursor.reset();
210+
while (!cursor.isDone()) {
211+
for (DimensionSelector selector : selectors) {
212+
IndexedInts row = selector.getRow();
213+
blackhole.consume(selector.lookupName(row.get(0)));
214+
}
215+
cursor.advance();
216+
}
217+
long duration = System.currentTimeMillis() - time;
218+
double throughput = rowsPerSegment / (double) duration;
219+
log.info("Throughput: " + throughput + " ops/ms");
220+
}
221+
222+
private Sequence<Cursor> makeCursors(IncrementalIndexStorageAdapter sa, DimFilter filter)
223+
{
224+
return sa.makeCursors(
225+
filter == null ? null : filter.toFilter(),
226+
schemaInfo.getDataInterval(),
227+
VirtualColumns.EMPTY,
228+
Granularities.ALL,
229+
false,
230+
null
231+
);
232+
}
233+
234+
private static DimensionSelector makeDimensionSelector(Cursor cursor, String name)
235+
{
236+
return cursor.getColumnSelectorFactory().makeDimensionSelector(new DefaultDimensionSpec(name, null));
237+
}
238+
}

0 commit comments

Comments
 (0)