Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.util.GridCommandHandlerIndexingClusterByClassWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerIndexingTest;
import org.apache.ignite.util.GridCommandHandlerIndexingWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerInterruptCommandTest;
import org.apache.ignite.util.GridCommandHandlerMetadataTest;
import org.apache.ignite.util.GridCommandHandlerPropertiesTest;
import org.apache.ignite.util.GridCommandHandlerSslTest;
Expand Down Expand Up @@ -68,6 +69,7 @@
GridCommandHandlerIndexingClusterByClassWithSSLTest.class,
GridCommandHandlerIndexingCheckSizeTest.class,
GridCommandHandlerCheckIndexesInlineSizeTest.class,
GridCommandHandlerInterruptCommandTest.class,
GridCommandHandlerMetadataTest.class,

KillCommandsCommandShTest.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,326 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.util;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DeploymentEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.junit.Test;

import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;

/**
* Checks cancel of execution validate_indexes command.
*/
public class GridCommandHandlerInterruptCommandTest extends GridCommandHandlerAbstractTest {
/** Load loop cycles. */
private static final int LOAD_LOOP = 500_000;

/** Idle verify task name. */
private static final String IDLE_VERIFY_TASK_V2 = "org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskV2";

/** Validate index task name. */
private static final String VALIDATE_INDEX_TASK = "org.apache.ignite.internal.visor.verify.VisorValidateIndexesTask";

/** Log listener. */
private ListeningTestLogger lnsrLog;

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

cleanPersistenceDir();
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();

cleanPersistenceDir();

super.afterTest();
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setGridLogger(lnsrLog)
.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setInitialSize(200L * 1024 * 1024)
.setMaxSize(200L * 1024 * 1024)
)
)
.setIncludeEventTypes(EventType.EVT_TASK_DEPLOYED)
.setCacheConfiguration(new CacheConfiguration<Integer, UserValue>(DEFAULT_CACHE_NAME)
.setName(DEFAULT_CACHE_NAME)
.setQueryEntities(Collections.singleton(createQueryEntity())));
}

/**
* Creates predifened query entity.
*
* @return Query entity.
*/
private QueryEntity createQueryEntity() {
QueryEntity qryEntity = new QueryEntity();
qryEntity.setKeyType(Integer.class.getTypeName());
qryEntity.setValueType(UserValue.class.getName());
qryEntity.setTableName("USER_TEST_TABLE");

LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("x", "java.lang.Integer");
fields.put("y", "java.lang.Integer");
fields.put("z", "java.lang.Integer");
qryEntity.setFields(fields);

LinkedHashMap<String, Boolean> idxFields = new LinkedHashMap<>();

QueryIndex idx2 = new QueryIndex();
idx2.setName("IDX_2");
idx2.setIndexType(QueryIndexType.SORTED);
idxFields = new LinkedHashMap<>();
idxFields.put("x", false);
idx2.setFields(idxFields);

QueryIndex idx3 = new QueryIndex();
idx3.setName("IDX_3");
idx3.setIndexType(QueryIndexType.SORTED);
idxFields = new LinkedHashMap<>();
idxFields.put("y", false);
idx3.setFields(idxFields);

QueryIndex idx4 = new QueryIndex();
idx4.setName("IDX_4");
idx4.setIndexType(QueryIndexType.SORTED);
idxFields = new LinkedHashMap<>();
idxFields.put("z", false);
idx4.setFields(idxFields);

qryEntity.setIndexes(Arrays.asList(idx2, idx3, idx4));
return qryEntity;
}

/**
* User value.
*/
private static class UserValue {
/** X. */
private int x;

/** Y. */
private int y;

/** Z. */
private int z;

/**
* @param x X.
* @param y Y.
* @param z Z.
*/
public UserValue(int x, int y, int z) {
this.x = x;
this.y = y;
this.z = z;
}

/**
* @param seed Seed.
*/
public UserValue(long seed) {
x = (int)(seed % 6991);
y = (int)(seed % 18679);
z = (int)(seed % 31721);
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(UserValue.class, this);
}
}

/**
* Checks that validate_indexes command will cancel after it interrupted.
*
* @throws Exception If failed.
*/
@Test
public void testValidateIndexesCommand() throws Exception {
lnsrLog = new ListeningTestLogger(false, log);

IgniteEx ignite = startGrid(0);

ignite.cluster().active(true);

preloadeData(ignite);

CountDownLatch startTaskLatch = waitForTaskEvent(ignite, VALIDATE_INDEX_TASK);

LogListener lnsrValidationCancelled = LogListener.matches("Index validation was cancelled.").build();

lnsrLog.registerListener(lnsrValidationCancelled);

IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
assertSame(EXIT_CODE_UNEXPECTED_ERROR, execute("--cache", "validate_indexes")));

startTaskLatch.await();

fut.cancel();

fut.get();

assertTrue(GridTestUtils.waitForCondition(() ->
ignite.compute().activeTaskFutures().isEmpty(), 10_000));

assertTrue(GridTestUtils.waitForCondition(lnsrValidationCancelled::check, 10_000));
}

/**
* Checks that idle verify command will not cancel if initiator client interrupted.
*
* @throws Exception If failed.
*/
@Test
public void testIdleVerifyCommand() throws Exception {
lnsrLog = new ListeningTestLogger(false, log);

IgniteEx ignite = startGrid(0);

ignite.cluster().active(true);

preloadeData(ignite);

CountDownLatch startTaskLatch = waitForTaskEvent(ignite, IDLE_VERIFY_TASK_V2);

LogListener lnsrValidationCancelled = LogListener.matches("Idle verify was cancelled.").build();

lnsrLog.registerListener(lnsrValidationCancelled);

IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
assertSame(EXIT_CODE_UNEXPECTED_ERROR, execute("--cache", "idle_verify")));

startTaskLatch.await();

fut.cancel();

fut.get();

assertTrue(GridTestUtils.waitForCondition(() ->
ignite.compute().activeTaskFutures().isEmpty(), 30_000));

assertFalse(lnsrValidationCancelled.check());
}

/**
* Method subscribe on task event and return a latch for waiting.
*
* @param ignite Ignite.
* @param taskName Task name.
* @return Latch which will open after event received.
*/

private CountDownLatch waitForTaskEvent(IgniteEx ignite, String taskName) {
CountDownLatch startTaskLatch = new CountDownLatch(1);

ignite.events().localListen((evt) -> {
assertTrue(evt instanceof DeploymentEvent);

if (taskName.equals(((DeploymentEvent)evt).alias())) {
startTaskLatch.countDown();

return false;
}

return true;
}, EventType.EVT_TASK_DEPLOYED);
return startTaskLatch;
}

/**
* Preload data to default cache.
*
* @param ignite Ignite.
*/
private void preloadeData(IgniteEx ignite) {
try (IgniteDataStreamer streamr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
for (int i = 0; i < LOAD_LOOP; i++)
streamr.addData(i, new UserValue(i));
}
}

/**
* Test invokes index validation closure and canceling it after started.
*
* @throws Exception If failed.
*/
@Test
public void testCancelValidateIndexesClosure() throws Exception {
IgniteEx ignite0 = startGrid(0);

ignite0.cluster().active(true);

preloadeData(ignite0);

AtomicBoolean cancelled = new AtomicBoolean(false);

ValidateIndexesClosure clo = new ValidateIndexesClosure(cancelled::get, Collections.singleton(DEFAULT_CACHE_NAME),
0, 0, false, true);

ListeningTestLogger listeningLogger = new ListeningTestLogger(false, log);

GridTestUtils.setFieldValue(clo, "ignite", ignite0);
GridTestUtils.setFieldValue(clo, "log", listeningLogger);

LogListener lnsrValidationStarted = LogListener.matches("Current progress of ValidateIndexesClosure").build();

listeningLogger.registerListener(lnsrValidationStarted);

IgniteInternalFuture fut = GridTestUtils.runAsync(() ->
GridTestUtils.assertThrows(log, clo::call, IgniteException.class, ValidateIndexesClosure.CANCELLED_MSG));

assertTrue(GridTestUtils.waitForCondition(lnsrValidationStarted::check, 10_000));

assertFalse(fut.isDone());

cancelled.set(true);

fut.get(10_000);
}
}
Loading