Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,16 @@ private void onSearchResponse(SearchResponse searchResponse) {
return;
}

// searchResponse may be null if the search was optimized to a noop
if (searchResponse == null) {
logger.debug("No indexing necessary for job [{}], saving state and shutting down.", getJobId());
// execute finishing tasks
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure),
e -> doSaveState(finishAndSetState(), position.get(), this::afterFinishOrFailure)));
return;
}

// allowPartialSearchResults is set to false, so we should never see shard failures here
assert (searchResponse.getShardFailures().length == 0);
stats.markStartProcessing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@

public class AsyncTwoPhaseIndexerTests extends ESTestCase {

AtomicBoolean isFinished = new AtomicBoolean(false);
AtomicBoolean isStopped = new AtomicBoolean(false);
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final AtomicBoolean isStopped = new AtomicBoolean(false);

@Before
public void reset() {
Expand All @@ -57,12 +57,14 @@ private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
// test the execution order
private volatile int step;
private final boolean stoppedBeforeFinished;
private final boolean noIndices;

protected MockIndexer(ThreadPool threadPool, AtomicReference<IndexerState> initialState,
Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished) {
Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished, boolean noIndices) {
super(threadPool, initialState, initialPosition, new MockJobStats());
this.latch = latch;
this.stoppedBeforeFinished = stoppedBeforeFinished;
this.noIndices = noIndices;
}

@Override
Expand Down Expand Up @@ -97,12 +99,19 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
assertThat(step, equalTo(1));
++step;
final SearchResponseSections sections = new SearchResponseSections(
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
null, false, null, null, 1);

// block till latch has been counted down, simulating network latency
awaitForLatch();

if (noIndices) {
// simulate no indices being searched due to optimizations
nextPhase.onResponse(null);
return;
}

final SearchResponseSections sections = new SearchResponseSections(
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
null, false, null, null, 1);
nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null));
}

Expand All @@ -115,7 +124,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
// for stop before finished we do not know if its stopped before are after the search
if (stoppedBeforeFinished == false) {
assertThat(step, equalTo(4));
assertThat(step, equalTo(noIndices ? 3 : 4));
}
++step;
next.run();
Expand All @@ -128,7 +137,7 @@ protected void onFailure(Exception exc) {

@Override
protected void onFinish(ActionListener<Void> listener) {
assertThat(step, equalTo(3));
assertThat(step, equalTo(noIndices ? 2 : 3));
++step;
listener.onResponse(null);
assertTrue(isFinished.compareAndSet(false, true));
Expand Down Expand Up @@ -356,9 +365,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}

private class MockThreadPool extends TestThreadPool {
private static class MockThreadPool extends TestThreadPool {

private List<TimeValue> delays = new ArrayList<>();
private final List<TimeValue> delays = new ArrayList<>();

MockThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
super(name, Settings.EMPTY, customBuilders);
Expand All @@ -381,7 +390,7 @@ public void testStateMachine() throws Exception {
final ThreadPool threadPool = new TestThreadPool(getTestName());
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false);
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false, false);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
Expand Down Expand Up @@ -419,12 +428,39 @@ public void testStateMachineBrokenSearch() throws Exception {
}
}

public void testZeroIndicesWhileIndexing() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ThreadPool threadPool = new TestThreadPool(getTestName());
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, false, true);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
assertBusy(() -> assertThat(indexer.getPosition(), equalTo(2)));

countDownLatch.countDown();
assertBusy(() -> assertTrue(isFinished.get()));
assertThat(indexer.getPosition(), equalTo(2));

assertFalse(isStopped.get());
assertThat(indexer.getStep(), equalTo(4));
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
assertThat(indexer.getStats().getNumPages(), equalTo(0L));
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
assertTrue(indexer.abort());
} finally {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
}
}

public void testStop_WhileIndexing() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ThreadPool threadPool = new TestThreadPool(getTestName());
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, true);
MockIndexer indexer = new MockIndexer(threadPool, state, 2, countDownLatch, true, false);
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>

injectPointInTimeIfNeeded(
buildSearchRequest(),
ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure)
ActionListener.wrap(pitSearchRequest -> doSearch(pitSearchRequest, nextPhase), nextPhase::onFailure)
);
}

Expand Down Expand Up @@ -393,12 +393,12 @@ private void injectPointInTimeIfNeeded(
Tuple<String, SearchRequest> namedSearchRequest,
ActionListener<Tuple<String, SearchRequest>> listener
) {
if (disablePit) {
SearchRequest searchRequest = namedSearchRequest.v2();
if (disablePit || searchRequest.indices().length == 0) {
listener.onResponse(namedSearchRequest);
return;
}

SearchRequest searchRequest = namedSearchRequest.v2();
PointInTimeBuilder pit = namedPits.get(namedSearchRequest.v1());
if (pit != null) {
searchRequest.source().pointInTimeBuilder(pit);
Expand Down Expand Up @@ -455,22 +455,30 @@ private void injectPointInTimeIfNeeded(
);
}

private void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<SearchResponse> listener) {
logger.trace(() -> new ParameterizedMessage("searchRequest: [{}]", namedSearchRequest.v2()));
void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<SearchResponse> listener) {
String name = namedSearchRequest.v1();
SearchRequest searchRequest = namedSearchRequest.v2();
// We want to treat a request to search 0 indices as a request to do nothing, not a request to search all indices
if (searchRequest.indices().length == 0) {
logger.debug("[{}] Search request [{}] optimized to noop; searchRequest [{}]", getJobId(), name, searchRequest);
listener.onResponse(null);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of a magic value null is dangerous, what if the client returns null. It seems better to return an empty result set, so that null causes the transform to fail as before.

return;
}
logger.trace("searchRequest: [{}]", searchRequest);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace("searchRequest: [{}]", searchRequest);
logger.trace(() -> new ParameterizedMessage("searchRequest: [{}]", searchRequest));


PointInTimeBuilder pit = namedSearchRequest.v2().pointInTimeBuilder();
PointInTimeBuilder pit = searchRequest.pointInTimeBuilder();

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
namedSearchRequest.v2(),
searchRequest,
ActionListener.wrap(response -> {
// did the pit change?
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
namedPits.put(namedSearchRequest.v1(), new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE));
logger.trace("point in time handle has changed; request [{}]", namedSearchRequest.v1());
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId().equals(pit.getEncodedId())) == false) {
namedPits.put(name, new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE));
logger.trace("point in time handle has changed; request [{}]", name);
}

listener.onResponse(response);
Expand All @@ -484,18 +492,18 @@ private void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionLis
new ParameterizedMessage(
"[{}] Search context missing, falling back to normal search; request [{}]",
getJobId(),
namedSearchRequest.v1()
name
),
e
);
namedPits.remove(namedSearchRequest.v1());
namedSearchRequest.v2().source().pointInTimeBuilder(null);
namedPits.remove(name);
searchRequest.source().pointInTimeBuilder(null);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
namedSearchRequest.v2(),
searchRequest,
listener
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,8 @@ private void finalizeCheckpoint(ActionListener<Void> listener) {
}

if (lastCheckpoint != null) {
long docsIndexed = 0;
long docsProcessed = 0;
docsIndexed = progress.getDocumentsIndexed();
docsProcessed = progress.getDocumentsProcessed();
long docsIndexed = progress.getDocumentsIndexed();
long docsProcessed = progress.getDocumentsProcessed();

long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp();
getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed);
Expand Down Expand Up @@ -1131,7 +1129,7 @@ private SearchRequest buildQueryToFindChanges() {
// TODO: if buildChangesQuery changes the query it get overwritten
sourceBuilder.query(filteredQuery);

logger.debug("[{}] Querying for changes: {}", getJobId(), sourceBuilder);
logger.debug("[{}] Querying {} for changes: {}", getJobId(), request.indices(), sourceBuilder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParameterizedMessage?

return request.source(sourceBuilder);
}

Expand Down Expand Up @@ -1168,7 +1166,7 @@ private SearchRequest buildQueryToUpdateDestinationIndex() {
}

sourceBuilder.query(queryBuilder);
logger.debug(() -> new ParameterizedMessage("[{}] Querying for data: {}", getJobId(), sourceBuilder));
logger.debug("[{}] Querying {} for data: {}", getJobId(), request.indices(), sourceBuilder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always thought that the logger would serialize the strings unless we provided a message provider.

Consequently, if we weren't on debug level, we are serializing the sourceBuilder unnecessarily.


return request.source(sourceBuilder)
.allowPartialSearchResults(false) // shard failures should fail the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,8 @@

public class ClientTransformIndexerTests extends ESTestCase {

public void testAudiOnFinishFrequency() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));

ClientTransformIndexer indexer = new ClientTransformIndexer(
mock(ThreadPool.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
mock(SchedulerEngine.class)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
mock(Client.class),
mock(TransformIndexerStats.class),
mock(TransformConfig.class),
null,
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);

public void testAuditOnFinishFrequency() {
ClientTransformIndexer indexer = createTestIndexer();
List<Boolean> shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList());

// Audit every checkpoint for the first 10
Expand Down Expand Up @@ -125,6 +101,17 @@ public void testAudiOnFinishFrequency() {
assertFalse(shouldAudit.get(11_999));
}

public void testDoSearchGivenNoIndices() {
ClientTransformIndexer indexer = createTestIndexer();
SearchRequest searchRequest = new SearchRequest(new String[0]);
Tuple<String, SearchRequest> namedSearchRequest = new Tuple<>("test", searchRequest);
indexer.doSearch(namedSearchRequest, ActionListener.wrap(
// A search of zero indices should return null rather than attempt to search all indices
ESTestCase::assertNull,
e -> fail(e.getMessage())
));
}

public void testPitInjection() throws InterruptedException {
TransformConfig config = TransformConfigTests.randomTransformConfig();

Expand Down Expand Up @@ -334,7 +321,7 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer {

@Override
protected Tuple<String, SearchRequest> buildSearchRequest() {
return new Tuple<>("mock", new SearchRequest().source(new SearchSourceBuilder()));
return new Tuple<>("mock", new SearchRequest("source_index").source(new SearchSourceBuilder()));
}
}

Expand Down Expand Up @@ -427,4 +414,31 @@ private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> f
function.accept(listener);
assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS));
}

private ClientTransformIndexer createTestIndexer() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));

return new ClientTransformIndexer(
mock(ThreadPool.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
mock(SchedulerEngine.class)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
mock(Client.class),
mock(TransformIndexerStats.class),
mock(TransformConfig.class),
null,
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);
}
}