Skip to content

Commit

Permalink
maybe this will fix it
Browse files Browse the repository at this point in the history
  • Loading branch information
clintropolis committed Aug 16, 2023
1 parent 868a5c3 commit 0d3eeb8
Show file tree
Hide file tree
Showing 11 changed files with 19 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -343,7 +342,6 @@ public boolean useParallelMergePool()
},
forkJoinPool,
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
Expand Down Expand Up @@ -366,7 +365,6 @@ public long getMaxQueuedBytes()
new BrokerParallelMergeConfig(),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new JoinableFactoryWrapper(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())),
new NoopServiceEmitter()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
/**
*
*/
@Deprecated
public class ConfigProvider<T> implements Provider<T>
{
private static final Logger log = new Logger(ConfigProvider.class);
Expand All @@ -40,11 +41,6 @@ public static <T> void bind(Binder binder, Class<T> clazz)
binder.bind(clazz).toProvider(of(clazz)).in(LazySingleton.class);
}

public static <T> void bind(Binder binder, Class<T> clazz, Map<String, String> replacements)
{
binder.bind(clazz).toProvider(of(clazz, replacements)).in(LazySingleton.class);
}

public static <T> Provider<T> of(Class<T> clazz)
{
return of(clazz, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.DruidServerMetadata;
Expand Down Expand Up @@ -127,7 +126,6 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final BrokerParallelMergeConfig parallelMergeConfig;
private final ForkJoinPool pool;
private final QueryScheduler scheduler;
private final JoinableFactoryWrapper joinableFactoryWrapper;
private final ServiceEmitter emitter;

@Inject
Expand All @@ -142,7 +140,6 @@ public CachingClusteredClient(
BrokerParallelMergeConfig parallelMergeConfig,
@Merging ForkJoinPool pool,
QueryScheduler scheduler,
JoinableFactoryWrapper joinableFactoryWrapper,
ServiceEmitter emitter
)
{
Expand All @@ -156,7 +153,6 @@ public CachingClusteredClient(
this.parallelMergeConfig = parallelMergeConfig;
this.pool = pool;
this.scheduler = scheduler;
this.joinableFactoryWrapper = joinableFactoryWrapper;
this.emitter = emitter;

if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -320,6 +319,12 @@ public long getMaxQueuedBytes()
},
new BrokerParallelMergeConfig()
{
@Override
public boolean useParallelMergePool()
{
return true;
}

@Override
public int getParallelism()
{
Expand All @@ -336,7 +341,6 @@ public int getDefaultMaxQueryParallelism()
},
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.coordination.ServerManagerTest;
import org.apache.druid.server.coordination.ServerType;
Expand Down Expand Up @@ -139,7 +138,6 @@ public void testGetQueryRunnerForSegments_singleIntervalLargeSegments()
Mockito.mock(BrokerParallelMergeConfig.class),
ForkJoinPool.commonPool(),
queryScheduler,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.ServerType;
Expand Down Expand Up @@ -2831,18 +2830,24 @@ public long getMaxQueuedBytes()
},
new BrokerParallelMergeConfig()
{
@Override
public boolean useParallelMergePool()
{
return true;
}

@Override
public int getParallelism()
{
// fixed so same behavior across all test environments
return 4;
return 1;
}

@Override
public int getDefaultMaxQueryParallelism()
{
// fixed so same behavior across all test environments
return 4;
return 1;
}
},
ForkJoinPool.commonPool(),
Expand All @@ -2852,7 +2857,6 @@ public int getDefaultMaxQueryParallelism()
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
),
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -108,7 +107,6 @@ protected QueryRunnerBasedOnClusteredClientTestBase()
{
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(
CLOSER,
USE_PARALLEL_MERGE_POOL_CONFIGURED,
() -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD
);

Expand Down Expand Up @@ -145,7 +143,6 @@ public void setupTestBase()
QueryStackTests.getParallelMergeConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new NoopServiceEmitter()
);
servers = new ArrayList<>();
Expand Down
17 changes: 2 additions & 15 deletions server/src/test/java/org/apache/druid/server/QueryStackTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,7 @@ public boolean useParallelMergePool()
}
};
}
public static DruidProcessingConfig getProcessingConfig(
boolean useParallelMergePoolConfigured,
final int mergeBuffers
)
public static DruidProcessingConfig getProcessingConfig(final int mergeBuffers)
{
return new DruidProcessingConfig()
{
Expand Down Expand Up @@ -245,27 +242,17 @@ public int getNumMergeBuffers()
*/
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer)
{
return createQueryRunnerFactoryConglomerate(closer, true, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD);
}

public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
final Closer closer,
final Supplier<Integer> minTopNThresholdSupplier
)
{
return createQueryRunnerFactoryConglomerate(closer, true, minTopNThresholdSupplier);
return createQueryRunnerFactoryConglomerate(closer, () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD);
}

public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
final Closer closer,
final boolean useParallelMergePoolConfigured,
final Supplier<Integer> minTopNThresholdSupplier
)
{
return createQueryRunnerFactoryConglomerate(
closer,
getProcessingConfig(
useParallelMergePoolConfigured,
DEFAULT_NUM_MERGE_BUFFERS
),
minTopNThresholdSupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public QueryRunnerFactoryConglomerate createCongolmerate(
} else {
return QueryStackTests.createQueryRunnerFactoryConglomerate(
resourceCloser,
QueryStackTests.getProcessingConfig(true, builder.mergeBufferCount)
QueryStackTests.getProcessingConfig(builder.mergeBufferCount)
);
}
}
Expand Down

0 comments on commit 0d3eeb8

Please sign in to comment.