Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private void onEntriesLocked() {
cctx.database().checkpointReadLock();

try {
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE || txEntry.op() == TRANSFORM) &&
txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
if (expiry != null) {
cached.unswap(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,96 @@

package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.TouchedExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;

/**
*
*/
/** */
@RunWith(Parameterized.class)
public class CacheQueryFilterExpiredTest extends GridCommonAbstractTest {
/** */
@Parameterized.Parameter
public CacheMode cacheMode;

/** */
@Parameterized.Parameter(1)
public CacheAtomicityMode cacheAtomicityMode;

/** */
@Parameterized.Parameter(2)
public boolean eagerTtl;

/** */
@Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}, eagerTtl={2}")
public static List<Object[]> params() {
List<Object[]> params = new ArrayList<>();

Stream.of(REPLICATED, PARTITIONED).forEach(cacheMode ->
Stream.of(ATOMIC, TRANSACTIONAL).forEach(cacheAtomicityMode ->
Stream.of(false, true).forEach(eagerTtl ->
params.add(new Object[] {cacheMode, cacheAtomicityMode, eagerTtl})
)
)
);

return params;
}

/** {@inheritDoc} */
@Override protected void afterTest() {
stopAllGrids();
}

/**
* @throws Exception If failed.
*/
@Test
public void testFilterExpired() throws Exception {
try (Ignite ignite = startGrid(0)) {
checkFilterExpired(ignite, ATOMIC, false);

checkFilterExpired(ignite, ATOMIC, true);

checkFilterExpired(ignite, TRANSACTIONAL, false);
try (Ignite ignite = startGrids(2)) {
checkFilterExpired(ignite);
}
}

checkFilterExpired(ignite, TRANSACTIONAL, true);
/**
* @throws Exception If failed.
*/
@Test
public void testInsertExpired() throws Exception {
try (Ignite ignite = startGrids(2)) {
checkInsertExpired(ignite);
}
}

/**
* @param ignite Node.
* @param atomicityMode Cache atomicity mode.
* @param eagerTtl Value for {@link CacheConfiguration#setEagerTtl(boolean)}.
* @throws Exception If failed.
*/
private void checkFilterExpired(Ignite ignite, CacheAtomicityMode atomicityMode, boolean eagerTtl) throws Exception {
private void checkFilterExpired(Ignite ignite) throws Exception {
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setAtomicityMode(cacheAtomicityMode);
ccfg.setCacheMode(cacheMode);
ccfg.setEagerTtl(eagerTtl);
ccfg.setIndexedTypes(Integer.class, Integer.class);

Expand Down Expand Up @@ -94,4 +139,37 @@ private void checkFilterExpired(Ignite ignite, CacheAtomicityMode atomicityMode,
ignite.destroyCache(ccfg.getName());
}
}

/** */
private void checkInsertExpired(Ignite ignite) throws Exception {
CacheConfiguration<Integer, Integer> ccfgFrom = new CacheConfiguration<>("CACHE1");
ccfgFrom.setAtomicityMode(cacheAtomicityMode);
ccfgFrom.setCacheMode(cacheMode);
ccfgFrom.setEagerTtl(eagerTtl);
ccfgFrom.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_DAY));
ccfgFrom.setIndexedTypes(Integer.class, Integer.class);

CacheConfiguration<Integer, Integer> ccfgTo = new CacheConfiguration<>("CACHE2");
ccfgTo.setAtomicityMode(cacheAtomicityMode);
ccfgTo.setCacheMode(cacheMode);
ccfgTo.setEagerTtl(eagerTtl);
ccfgTo.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 2_000)));
ccfgTo.setIndexedTypes(Integer.class, Integer.class);

final IgniteCache<Integer, Integer> cacheFrom = ignite.createCache(ccfgFrom);
final IgniteCache<Integer, Integer> cacheTo = ignite.createCache(ccfgTo);

for (int i = 0; i < 10; i++)
cacheFrom.put(i, i);

cacheFrom.query(new SqlFieldsQuery("INSERT INTO cache2.INTEGER(_key, _val) SELECT _key, _val FROM cache1.INTEGER;")).getAll();

assertEquals(10, cacheTo.query(new SqlFieldsQuery("select _key, _val from cache2.INTEGER")).getAll().size());

assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return cacheTo.query(new SqlFieldsQuery("select _key, _val from cache2.INTEGER")).getAll().isEmpty();
}
}, 5_000, 1_000));
}
}