Skip to content

Commit

Permalink
GEODE-9602: QueryObserver improvements.
Browse files Browse the repository at this point in the history
- Make QueryObserverHolder thread-safe
- Allow having an observer per query by means of setting the observer
  in the query at the start of the execution.
- Invoke beforeIterationEvaluation and afterIterationEvaluation callbacks when
  query is using indexes.
  • Loading branch information
albertogpz committed Sep 16, 2021
1 parent d0113fc commit bf0c44c
Show file tree
Hide file tree
Showing 30 changed files with 261 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -339,7 +341,9 @@ public void testBugResultMismatch() throws Exception {
SelectResults rs1 = (SelectResults) q1.execute();
SelectResults rs2 = (SelectResults) q2.execute();

assertThatCode(() -> QueryUtils.union(rs1, rs2, null)).doesNotThrowAnyException();
ExecutionContext context = mock(ExecutionContext.class);
when(context.getObserver()).thenReturn(new QueryObserverAdapter());
assertThatCode(() -> QueryUtils.union(rs1, rs2, context)).doesNotThrowAnyException();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
import java.util.Collection;
Expand Down Expand Up @@ -209,6 +211,8 @@ public void testCompareThrowsClassCastException() throws Exception {

private OrderByComparator createComparator() throws Exception {
StructTypeImpl objType = new StructTypeImpl();
return new OrderByComparator(null, objType, null);
ExecutionContext context = mock(ExecutionContext.class);
when(context.getObserver()).thenReturn(new QueryObserverAdapter());
return new OrderByComparator(null, objType, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.geode.cache.query.data.Address;
import org.apache.geode.cache.query.data.Employee;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.test.junit.categories.OQLQueryTest;
import org.apache.geode.test.junit.rules.ServerStarterRule;

Expand Down Expand Up @@ -85,6 +86,7 @@ public void setUp() throws Exception {
@After
public void tearDown() {
QueryObserverHolder.reset();
IndexManager.TEST_RANGEINDEX_ONLY = false;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -243,6 +245,59 @@ public void beforeAggregationsAndGroupByShouldBeCalledForAggregateFunctions() th
verify(myQueryObserver, times(queries.size())).beforeAggregationsAndGroupBy(any());
}

@Test
public void testBeforeAndAfterIterationEvaluateNoWhere() throws Exception {
Query query = queryService.newQuery(
"select count(*) from " + SEPARATOR + "portfolio p");

query.execute();
verify(myQueryObserver, times(0)).beforeIterationEvaluation(any(), any());
verify(myQueryObserver, times(0)).afterIterationEvaluation(any());
}

@Test
public void testBeforeAndAfterIterationEvaluateWithoutIndex() throws Exception {
Query query = queryService.newQuery(
"select count(*) from " + SEPARATOR + "portfolio p where p.isActive = true ");

query.execute();
verify(myQueryObserver, times(4)).beforeIterationEvaluation(any(), any());
verify(myQueryObserver, times(4)).afterIterationEvaluation(any());
}

@Test
public void testBeforeAndAfterIterationEvaluateWithCompactRangeIndex() throws Exception {
Query query = queryService.newQuery(
"select count(*) from " + SEPARATOR + "portfolio p where p.isActive = true ");
queryService.createIndex("isActiveIndex", "isActive", SEPARATOR + "portfolio");

query.execute();
verify(myQueryObserver, times(2)).beforeIterationEvaluation(any(), any());
verify(myQueryObserver, times(2)).afterIterationEvaluation(any());
assertThat(myQueryObserver.dbIndx[2] == myQueryObserver.usedIndx)
.as("Validate callback of Indexes").isTrue();
assertThat(myQueryObserver.unusedIndx == myQueryObserver.dbIndx[0]
|| myQueryObserver.unusedIndx == myQueryObserver.dbIndx[1])
.as("Validate callback of Indexes").isTrue();
}

@Test
public void testBeforeAndAfterIterationEvaluateWithRangeIndex() throws Exception {
IndexManager.TEST_RANGEINDEX_ONLY = true;
Query query = queryService.newQuery(
"select count(*) from " + SEPARATOR + "portfolio p where p.description = 'XXXX' ");
queryService.createIndex("descriptionIndex", "description", SEPARATOR + "portfolio");

query.execute();
verify(myQueryObserver, times(2)).beforeIterationEvaluation(any(), any());
verify(myQueryObserver, times(2)).afterIterationEvaluation(any());
assertThat(myQueryObserver.dbIndx[2] == myQueryObserver.usedIndx)
.as("Validate callback of Indexes").isTrue();
assertThat(myQueryObserver.unusedIndx == myQueryObserver.dbIndx[0]
|| myQueryObserver.unusedIndx == myQueryObserver.dbIndx[1])
.as("Validate callback of Indexes").isTrue();
}

private static class MyQueryObserverImpl extends QueryObserverAdapter {
private int j = 0;
private Index usedIndx = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.cache.query.internal;

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -60,10 +61,12 @@ public class QueryTraceJUnitTest {
@Before
public void setUp() throws Exception {
CacheUtils.startCache();
DefaultQuery.testHook = new BeforeQueryExecutionHook();
}

@After
public void tearDown() throws Exception {
DefaultQuery.testHook = null;
CacheUtils.closeCache();
}

Expand Down Expand Up @@ -104,7 +107,11 @@ public void testTraceOnPartitionedRegionWithTracePrefix() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -141,7 +148,11 @@ public void testTraceOnLocalRegionWithTracePrefix() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -183,7 +194,11 @@ public void testNegTraceOnPartitionedRegionWithTracePrefix() throws Exception {
assertFalse(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should not have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -223,7 +238,11 @@ public void testNegTraceOnLocalRegionWithTracePrefix() throws Exception {
assertFalse(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should not have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -262,7 +281,11 @@ public void testTraceOnPartitionedRegionWithTracePrefixNoComments() throws Excep
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -296,8 +319,11 @@ public void testTraceOnLocalRegionWithTracePrefixNoComments() throws Exception {
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);
// The query should return all elements in region.

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
}
Expand Down Expand Up @@ -331,7 +357,11 @@ public void testTraceOnPartitionedRegionWithSmallTracePrefixNoComments() throws
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -366,7 +396,11 @@ public void testTraceOnLocalRegionWithSmallTracePrefixNoComments() throws Except
assertTrue(((DefaultQuery) query).isTraced());

SelectResults results = (SelectResults) query.execute();
assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver);

// The IndexTrackingObserver should have been set
BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook;
assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class);

// The query should return all elements in region.
assertEquals(region.size(), results.size());
QueryObserverHolder.reset();
Expand Down Expand Up @@ -438,4 +472,21 @@ public void testQueryFailLocalRegionWithSmallTracePrefixNoSpace() throws Excepti
}
}

private class BeforeQueryExecutionHook implements DefaultQuery.TestHook {
private QueryObserver observer = null;

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
switch (spot) {
case BEFORE_QUERY_EXECUTION:
observer = executionContext.getObserver();
break;
}
}

public QueryObserver getObserver() {
return observer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private SelectResults auxIterateEvaluate(CompiledValue operand, ExecutionContext
resultSet = QueryUtils.createResultCollection(context, elementType);
}

QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
try {
observer.startIteration(intermediateResults, operand);
Iterator iResultsIter = intermediateResults.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private SelectResults evaluateAndJunction(ExecutionContext context)
iterOperandsToSend = new CompiledJunction(cv, this.operator);
}
}
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
observer.beforeCartesianOfGroupJunctionsInAnAllGroupJunctionOfType_AND(results);
resultsSet = QueryUtils.cartesian(results, itrsForResultFields, expansionList, finalList,
context, iterOperandsToSend);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private SelectResults singleBaseCollectionFilterEvaluate(ExecutionContext contex
// before the index lookup
int op = reflectOnOperator(indexInfo._key());
// actual index lookup
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
List projAttrib = null;
/*
* Asif : First obtain the match level of index resultset. If the match level happens to be zero
Expand Down Expand Up @@ -535,7 +535,7 @@ private SelectResults doubleBaseCollectionFilterEvaluate(ExecutionContext contex
// each of the
// one dimensional array can be either genuine result object or StructImpl
// object.
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
context.cachePut(CompiledValue.INDEX_INFO, indxInfo);
/*
* Asif : If the independent Group of iterators passed is not null or the independent Group of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void mapOriginalOrderByColumns(ExecutionContext context)
public SelectResults evaluate(ExecutionContext context) throws FunctionDomainException,
TypeMismatchException, NameResolutionException, QueryInvocationTargetException {
SelectResults selectResults = super.evaluate(context);
QueryObserverHolder.getInstance().beforeAggregationsAndGroupBy(selectResults);
context.getObserver().beforeAggregationsAndGroupBy(selectResults);

return this.applyAggregateAndGroupBy(selectResults, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ SelectResults singleBaseCollectionFilterEvaluate(ExecutionContext context,
}
}

QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
try {
Object evalColln = evaluateColln(context);
observer.beforeIndexLookup(indexInfo._index, TOK_EQ, evalColln);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ SelectResults auxIterateEvaluate(CompiledValue operand, ExecutionContext context
}


QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
try {
observer.startIteration(intermediateResults, operand);
Iterator iResultsIter = intermediateResults.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public SelectResults evaluate(ExecutionContext context) throws FunctionDomainExc

result = ((Filter) this.whereClause).filterEvaluate(context, null);
if (!(context.cacheGet(RESULT_TYPE) instanceof Boolean)) {
QueryObserverHolder.getInstance()
context.getObserver()
.beforeApplyingProjectionOnFilterEvaluatedResults(result);
result = applyProjectionOnCollection(result, context, !needsTopLevelOrdering);
}
Expand Down Expand Up @@ -691,7 +691,7 @@ private SelectResults doIterationEvaluate(ExecutionContext context, boolean eval
for (Iterator itr = tmpResults.iterator(); itr.hasNext();) {
Object currObj = itr.next();
rIter.setCurrent(currObj);
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
observer.beforeIterationEvaluation(rIter, currObj);
applyProjectionAndAddToResultSet(context, results, this.orderByAttrs == null);
}
Expand Down Expand Up @@ -773,7 +773,7 @@ private int doNestedIterations(int level, SelectResults results, ExecutionContex
boolean addToResults = true;
if (evaluateWhereClause) {
Object result = this.whereClause.evaluate(context);
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
observer.afterIterationEvaluation(result);
if (result == null) {
addToResults = false;
Expand Down Expand Up @@ -839,7 +839,7 @@ private int doNestedIterations(int level, SelectResults results, ExecutionContex

Object currObj = aSr;
rIter.setCurrent(currObj);
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
observer.beforeIterationEvaluation(rIter, currObj);
numElementsInResult = doNestedIterations(level + 1, results, context, evaluateWhereClause,
numElementsInResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public SelectResults filterEvaluate(ExecutionContext context, SelectResults inte
}
int op = _is_defined ? TOK_NE : TOK_EQ;
Object key = QueryService.UNDEFINED;
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
try {
observer.beforeIndexLookup(idxInfo[0]._index, op, key);
context.cachePut(CompiledValue.INDEX_INFO, idxInfo[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private SelectResults evaluateAndJunction(ExecutionContext context)
}
// Do the cartesian of the different group junction results.
// TODO:Asif Remove the time
QueryObserver observer = QueryObserverHolder.getInstance();
QueryObserver observer = context.getObserver();
observer.beforeCartesianOfGroupJunctionsInCompositeGroupJunctionOfType_AND(results);
SelectResults grpCartRs = QueryUtils.cartesian(results, itrsForResultFields, expansionList,
finalList, context, iterOp);
Expand Down
Loading

0 comments on commit bf0c44c

Please sign in to comment.