diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxy.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxy.java new file mode 100644 index 0000000000000..eca575b7a8d08 --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.transactions.proxy; + +import org.apache.ignite.client.ClientTransaction; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Represents {@link TransactionProxy} implementation that uses {@link ClientTransaction} to perform transaction + * operations. + */ +public class ClientTransactionProxy implements TransactionProxy { + /** */ + private final ClientTransaction tx; + + /** */ + public ClientTransactionProxy(ClientTransaction tx) { + this.tx = tx; + } + + /** {@inheritDoc} */ + @Override public void commit() { + tx.commit(); + } + + /** {@inheritDoc} */ + @Override public void rollback() { + tx.rollback(); + } + + /** {@inheritDoc} */ + @Override public void close() { + tx.close(); + } + + /** {@inheritDoc} */ + @Override public boolean setRollbackOnly() { + throw new UnsupportedOperationException("Operation is not supported by thin client."); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientTransactionProxy.class, this); + } +} diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxyFactory.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxyFactory.java new file mode 100644 index 0000000000000..440867dd23cbd --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/ClientTransactionProxyFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.transactions.proxy; + +import org.apache.ignite.client.ClientTransactions; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Represents {@link TransactionProxyFactory} implementation that uses Ignite thin client transaction facade to start + * new transaction. + */ +public class ClientTransactionProxyFactory implements TransactionProxyFactory { + /** */ + private final ClientTransactions txs; + + /** */ + public ClientTransactionProxyFactory(ClientTransactions txs) { + this.txs = txs; + } + + /** {@inheritDoc} */ + @Override public TransactionProxy txStart( + TransactionConcurrency concurrency, + TransactionIsolation isolation, + long timeout + ) { + return new ClientTransactionProxy(txs.txStart(concurrency, isolation, timeout)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object other) { + if (this == other) + return true; + + if (other == null || getClass() != other.getClass()) + return false; + + return txs.equals(((ClientTransactionProxyFactory)other).txs); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return txs.hashCode(); + } +} diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxy.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxy.java new file mode 100644 index 0000000000000..15eaaae54b5cd --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.transactions.proxy; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.transactions.Transaction; + +/** + * Represents {@link TransactionProxy} implementation that uses {@link Transaction} to perform transaction + * operations. + */ +public class IgniteTransactionProxy implements TransactionProxy { + /** */ + private final Transaction tx; + + /** */ + public IgniteTransactionProxy(Transaction tx) { + this.tx = tx; + } + + /** {@inheritDoc} */ + @Override public void commit() { + tx.commit(); + } + + /** {@inheritDoc} */ + @Override public void rollback() { + tx.rollback(); + } + + /** {@inheritDoc} */ + @Override public void close() { + tx.close(); + } + + /** {@inheritDoc} */ + @Override public boolean setRollbackOnly() { + return tx.setRollbackOnly(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteTransactionProxy.class, this); + } +} diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxyFactory.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxyFactory.java new file mode 100644 index 0000000000000..efc0c51d30d00 --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/IgniteTransactionProxyFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.transactions.proxy; + +import java.util.Objects; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Represents {@link TransactionProxyFactory} implementation that uses Ignite node transaction facade to start new + * transaction. + */ +public class IgniteTransactionProxyFactory implements TransactionProxyFactory { + /** */ + private final IgniteTransactions txs; + + /** */ + public IgniteTransactionProxyFactory(IgniteTransactions txs) { + this.txs = txs; + } + + /** {@inheritDoc} */ + @Override public TransactionProxy txStart( + TransactionConcurrency concurrency, + TransactionIsolation isolation, + long timeout + ) { + return new IgniteTransactionProxy(txs.txStart(concurrency, isolation, timeout, 0)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object other) { + if (this == other) + return true; + + if (other == null || getClass() != other.getClass()) + return false; + + return txs.equals(((IgniteTransactionProxyFactory)other).txs); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(txs); + } +} diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxy.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxy.java new file mode 100644 index 0000000000000..a40a5bd799251 --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.transactions.proxy; + +/** Represents Ignite client-independent transaction operations. */ +public interface TransactionProxy extends AutoCloseable { + /** Commits this transaction. */ + public void commit(); + + /** Rolls back this transaction. */ + public void rollback(); + + /** Ends the transaction. Transaction will be rolled back if it has not been committed. */ + @Override public void close(); + + /** + * Modify the transaction associated with the current thread such that the + * only possible outcome of the transaction is to roll back the + * transaction. + * + * @return {@code True} if rollback-only flag was set as a result of this operation, + * {@code false} if it was already set prior to this call or could not be set + * because transaction is already finishing up committing or rolling back. + */ + public boolean setRollbackOnly(); +} diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxyFactory.java b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxyFactory.java new file mode 100644 index 0000000000000..548472393ebb0 --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/internal/transactions/proxy/TransactionProxyFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.transactions.proxy; + +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** Represents Ignite client-independent transaction factory. */ +public interface TransactionProxyFactory { + /** Starts transaction with specified concurrency, isolation and timeout. */ + public TransactionProxy txStart(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout); +} diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/AbstractSpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/AbstractSpringTransactionManager.java new file mode 100644 index 0000000000000..717baeb85de71 --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/AbstractSpringTransactionManager.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.transactions.spring; + +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.transactions.proxy.TransactionProxy; +import org.apache.ignite.internal.transactions.proxy.TransactionProxyFactory; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.transaction.CannotCreateTransactionException; +import org.springframework.transaction.InvalidIsolationLevelException; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionSystemException; +import org.springframework.transaction.support.AbstractPlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; +import org.springframework.transaction.support.ResourceTransactionManager; +import org.springframework.transaction.support.SmartTransactionObject; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.transaction.support.TransactionSynchronizationUtils; + +/** Abstract implementation of Spring Transaction manager with omitted Ignite cluster access logic. */ +public abstract class AbstractSpringTransactionManager extends AbstractPlatformTransactionManager + implements ResourceTransactionManager, ApplicationListener +{ + /** Transaction factory.*/ + private TransactionProxyFactory txFactory; + + /** Ignite logger. */ + private IgniteLogger log; + + /** Transaction concurrency level. */ + private TransactionConcurrency txConcurrency; + + /** Default transaction isolation. */ + private TransactionIsolation dfltTxIsolation; + + /** Default transaction timeout. */ + private long dfltTxTimeout; + + /** + * Gets transaction concurrency level. + * + * @return Transaction concurrency level. + */ + public TransactionConcurrency getTransactionConcurrency() { + return txConcurrency; + } + + /** + * Sets transaction concurrency level. + * + * @param txConcurrency transaction concurrency level. + */ + public void setTransactionConcurrency(TransactionConcurrency txConcurrency) { + this.txConcurrency = txConcurrency; + } + + /** {@inheritDoc} */ + @Override public void onApplicationEvent(ContextRefreshedEvent evt) { + if (txConcurrency == null) + txConcurrency = defaultTransactionConcurrency(); + + dfltTxIsolation = defaultTransactionIsolation(); + + dfltTxTimeout = defaultTransactionTimeout(); + + log = log(); + + txFactory = createTransactionFactory(); + } + + /** {@inheritDoc} */ + @Override protected Object doGetTransaction() throws TransactionException { + IgniteTransactionObject txObj = new IgniteTransactionObject(); + + txObj.setTransactionHolder( + (IgniteTransactionHolder)TransactionSynchronizationManager.getResource(txFactory), false); + + return txObj; + } + + /** {@inheritDoc} */ + @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { + if (definition.getIsolationLevel() == TransactionDefinition.ISOLATION_READ_UNCOMMITTED) + throw new InvalidIsolationLevelException("Ignite does not support READ_UNCOMMITTED isolation level."); + + IgniteTransactionObject txObj = (IgniteTransactionObject)transaction; + TransactionProxy tx = null; + + try { + if (txObj.getTransactionHolder() == null || txObj.getTransactionHolder().isSynchronizedWithTransaction()) { + long timeout = dfltTxTimeout; + + if (definition.getTimeout() > 0) + timeout = TimeUnit.SECONDS.toMillis(definition.getTimeout()); + + TransactionProxy newTx = txFactory.txStart(txConcurrency, + convertToIgniteIsolationLevel(definition.getIsolationLevel()), timeout); + + if (log.isDebugEnabled()) + log.debug("Started Ignite transaction: " + newTx); + + txObj.setTransactionHolder(new IgniteTransactionHolder(newTx), true); + } + + txObj.getTransactionHolder().setSynchronizedWithTransaction(true); + txObj.getTransactionHolder().setTransactionActive(true); + + tx = txObj.getTransactionHolder().getTransaction(); + + // Bind the session holder to the thread. + if (txObj.isNewTransactionHolder()) + TransactionSynchronizationManager.bindResource(txFactory, txObj.getTransactionHolder()); + } + catch (Exception ex) { + if (tx != null) + tx.close(); + + throw new CannotCreateTransactionException("Could not create Ignite transaction", ex); + } + } + + /** {@inheritDoc} */ + @Override protected void doCommit(DefaultTransactionStatus status) throws TransactionException { + IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction(); + TransactionProxy tx = txObj.getTransactionHolder().getTransaction(); + + if (status.isDebug() && log.isDebugEnabled()) + log.debug("Committing Ignite transaction: " + tx); + + try { + tx.commit(); + } + catch (Exception e) { + throw new TransactionSystemException("Could not commit Ignite transaction", e); + } + } + + /** {@inheritDoc} */ + @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException { + IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction(); + TransactionProxy tx = txObj.getTransactionHolder().getTransaction(); + + if (status.isDebug() && log.isDebugEnabled()) + log.debug("Rolling back Ignite transaction: " + tx); + + try { + tx.rollback(); + } + catch (Exception e) { + throw new TransactionSystemException("Could not rollback Ignite transaction", e); + } + } + + /** {@inheritDoc} */ + @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException { + IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction(); + TransactionProxy tx = txObj.getTransactionHolder().getTransaction(); + + assert tx != null; + + if (status.isDebug() && log.isDebugEnabled()) + log.debug("Setting Ignite transaction rollback-only: " + tx); + + tx.setRollbackOnly(); + } + + /** {@inheritDoc} */ + @Override protected void doCleanupAfterCompletion(Object transaction) { + IgniteTransactionObject txObj = (IgniteTransactionObject)transaction; + + // Remove the transaction holder from the thread, if exposed. + if (txObj.isNewTransactionHolder()) { + TransactionProxy tx = txObj.getTransactionHolder().getTransaction(); + TransactionSynchronizationManager.unbindResource(txFactory); + + if (log.isDebugEnabled()) + log.debug("Releasing Ignite transaction: " + tx); + } + + txObj.getTransactionHolder().clear(); + } + + /** {@inheritDoc} */ + @Override protected boolean isExistingTransaction(Object transaction) throws TransactionException { + IgniteTransactionObject txObj = (IgniteTransactionObject)transaction; + + return (txObj.getTransactionHolder() != null && txObj.getTransactionHolder().isTransactionActive()); + } + + /** {@inheritDoc} */ + @Override public Object getResourceFactory() { + return txFactory; + } + + /** + * @param isolationLevel Spring isolation level. + * @return Ignite isolation level. + */ + private TransactionIsolation convertToIgniteIsolationLevel(int isolationLevel) { + TransactionIsolation isolation = dfltTxIsolation; + + switch (isolationLevel) { + case TransactionDefinition.ISOLATION_READ_COMMITTED: + isolation = TransactionIsolation.READ_COMMITTED; + + break; + + case TransactionDefinition.ISOLATION_REPEATABLE_READ: + isolation = TransactionIsolation.REPEATABLE_READ; + + break; + + case TransactionDefinition.ISOLATION_SERIALIZABLE: + isolation = TransactionIsolation.SERIALIZABLE; + } + + return isolation; + } + + /** @return Default transaction isolation. */ + protected abstract TransactionIsolation defaultTransactionIsolation(); + + /** @return Default transaction timeout. */ + protected abstract long defaultTransactionTimeout(); + + /** @return Default transaction concurrency. */ + protected abstract TransactionConcurrency defaultTransactionConcurrency(); + + /** Creates instance of {@link TransactionProxyFactory} that will be used to start new Ignite transactions. */ + protected abstract TransactionProxyFactory createTransactionFactory(); + + /** @return Ignite logger. */ + protected abstract IgniteLogger log(); + + /** + * An object representing a managed Ignite transaction. + */ + protected static class IgniteTransactionObject implements SmartTransactionObject { + /** */ + private IgniteTransactionHolder txHolder; + + /** */ + private boolean newTxHolder; + + /** + * Sets the resource holder being used to hold Ignite resources in the + * transaction. + * + * @param txHolder the transaction resource holder + * @param newTxHolder true if the holder was created for this transaction, + * false if it already existed + */ + private void setTransactionHolder(IgniteTransactionHolder txHolder, boolean newTxHolder) { + this.txHolder = txHolder; + this.newTxHolder = newTxHolder; + } + + /** + * Returns the resource holder being used to hold Ignite resources in the + * transaction. + * + * @return the transaction resource holder + */ + protected IgniteTransactionHolder getTransactionHolder() { + return txHolder; + } + + /** + * Returns true if the transaction holder was created for the current + * transaction and false if it existed prior to the transaction. + * + * @return true if the holder was created for this transaction, false if it + * already existed + */ + private boolean isNewTransactionHolder() { + return newTxHolder; + } + + /** {@inheritDoc} */ + @Override public boolean isRollbackOnly() { + return txHolder.isRollbackOnly(); + } + + /** {@inheritDoc} */ + @Override public void flush() { + TransactionSynchronizationUtils.triggerFlush(); + } + } +} diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManager.java new file mode 100644 index 0000000000000..25eff87ed7894 --- /dev/null +++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.transactions.spring; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.configuration.ClientTransactionConfiguration; +import org.apache.ignite.internal.transactions.proxy.ClientTransactionProxyFactory; +import org.apache.ignite.internal.transactions.proxy.TransactionProxyFactory; +import org.apache.ignite.logger.NullLogger; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.support.DefaultTransactionStatus; + +/** + * Represents {@link AbstractSpringTransactionManager} implementation that uses thin client to access the cluster and + * manage transactions. It requires thin client instance to be set before manager use + * (see {@link #setClientInstance(IgniteClient)}). + * + * You can provide ignite client instance to a Spring configuration XML file, like below: + * + *
+ * <beans xmlns="http://www.springframework.org/schema/beans"
+ *        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ *        xmlns:tx="http://www.springframework.org/schema/tx"
+ *        xsi:schemaLocation="
+ *            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ *            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
+ *     <-- Provide Ignite client instance. -->
+ *     <bean id="transactionManager" class="org.apache.ignite.transactions.spring.IgniteClientSpringTransactionManager">
+ *         <property name="clientInstance" ref="igniteClientBean"/>
+ *     </bean>
+ *
+ *     <-- Use annotation-driven transaction configuration. -->
+ *     <tx:annotation-driven/>
+ * </beans>
+ * 
+ * + * Note that the same thin client instance must be used to both initialize the transaction manager and perform + * transactional operations. + * + * @see SpringTransactionManager to configure Transaction Manager access to the cluster through the Ignite client node. + */ +public class IgniteClientSpringTransactionManager extends AbstractSpringTransactionManager { + /** No-op Ignite logger. */ + private static final IgniteLogger NOOP_LOG = new NullLogger(); + + /** Thin client instance. */ + private IgniteClient cli; + + /** @return Thin client instance that is used for accessing the Ignite cluster. */ + public IgniteClient getClientInstance() { + return cli; + } + + /** Sets thin client instance that is used for accessing the Ignite cluster. */ + public void setClientInstance(IgniteClient cli) { + this.cli = cli; + } + + /** {@inheritDoc} */ + @Override public void onApplicationEvent(ContextRefreshedEvent evt) { + if (cli == null) { + throw new IllegalArgumentException("Failed to obtain thin client instance for accessing the Ignite" + + " cluster. Check that 'clientInstance' property is set."); + } + + super.onApplicationEvent(evt); + } + + /** {@inheritDoc} */ + @Override protected TransactionIsolation defaultTransactionIsolation() { + return ClientTransactionConfiguration.DFLT_TX_ISOLATION; + } + + /** {@inheritDoc} */ + @Override protected long defaultTransactionTimeout() { + return ClientTransactionConfiguration.DFLT_TRANSACTION_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected TransactionConcurrency defaultTransactionConcurrency() { + return ClientTransactionConfiguration.DFLT_TX_CONCURRENCY; + } + + /** {@inheritDoc} */ + @Override protected TransactionProxyFactory createTransactionFactory() { + return new ClientTransactionProxyFactory(cli.transactions()); + } + + /** {@inheritDoc} */ + @Override protected IgniteLogger log() { + return NOOP_LOG; + } + + /** {@inheritDoc} */ + @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException { + ((IgniteTransactionObject)status.getTransaction()).getTransactionHolder().setRollbackOnly(); + } +} diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java index e2c713340562e..d0363a13f97ea 100644 --- a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java +++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/IgniteTransactionHolder.java @@ -17,6 +17,7 @@ package org.apache.ignite.transactions.spring; +import org.apache.ignite.internal.transactions.proxy.TransactionProxy; import org.apache.ignite.transactions.Transaction; import org.springframework.transaction.support.ResourceHolderSupport; @@ -26,7 +27,7 @@ */ class IgniteTransactionHolder extends ResourceHolderSupport { /** */ - private Transaction transaction; + private TransactionProxy transaction; /** */ private boolean transactionActive; @@ -36,7 +37,7 @@ class IgniteTransactionHolder extends ResourceHolderSupport { * * @param transaction the transaction to hold */ - IgniteTransactionHolder(Transaction transaction) { + IgniteTransactionHolder(TransactionProxy transaction) { this.transaction = transaction; } @@ -54,7 +55,7 @@ public boolean hasTransaction() { * * @param transaction the transaction */ - void setTransaction(Transaction transaction) { + void setTransaction(TransactionProxy transaction) { this.transaction = transaction; } @@ -63,7 +64,7 @@ void setTransaction(Transaction transaction) { * * @return the transaction or null */ - Transaction getTransaction() { + TransactionProxy getTransaction() { return this.transaction; } diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java index 0d7056e164ebd..8736c6f9c08a8 100644 --- a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java +++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java @@ -17,32 +17,23 @@ package org.apache.ignite.transactions.spring; -import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSpring; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.transactions.proxy.IgniteTransactionProxyFactory; +import org.apache.ignite.internal.transactions.proxy.TransactionProxyFactory; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.transaction.CannotCreateTransactionException; -import org.springframework.transaction.InvalidIsolationLevelException; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionSystemException; -import org.springframework.transaction.support.AbstractPlatformTransactionManager; -import org.springframework.transaction.support.DefaultTransactionStatus; -import org.springframework.transaction.support.ResourceTransactionManager; -import org.springframework.transaction.support.TransactionSynchronizationManager; /** * Implementation of Spring transaction abstraction based on Ignite transaction. @@ -199,71 +190,29 @@ * } * */ -public class SpringTransactionManager extends AbstractPlatformTransactionManager - implements ResourceTransactionManager, PlatformTransactionManager, ApplicationListener, ApplicationContextAware { - /** - * Logger. - */ - private IgniteLogger log; - - /** - * Transaction concurrency level. - */ - private TransactionConcurrency transactionConcurrency; - - /** - * Grid configuration file path. - */ +public class SpringTransactionManager extends AbstractSpringTransactionManager implements ApplicationContextAware, + DisposableBean +{ + /** Grid configuration file path. */ private String cfgPath; - /** - * Ignite configuration. - */ + /** Ignite configuration. */ private IgniteConfiguration cfg; - /** - * Ignite instance name. - */ + /** Ignite instance name. */ private String igniteInstanceName; - /** - * Ignite instance. - */ + /** Ignite instance. */ private Ignite ignite; - /** Spring context */ - private ApplicationContext springCtx; + /** Flag indicating that Ignite instance was not created inside current transaction manager. */ + private boolean externalIgniteInstance; - /** {@inheritDoc} */ - @Override public void setApplicationContext(ApplicationContext ctx) { - this.springCtx = ctx; - } - - /** - * Constructs the transaction manager with no target Ignite instance. An - * instance must be set before use. - */ - public SpringTransactionManager() { - setNestedTransactionAllowed(false); - } + /** Ignite transactions configuration. */ + private TransactionConfiguration txCfg; - /** - * Gets transaction concurrency level. - * - * @return Transaction concurrency level. - */ - public TransactionConcurrency getTransactionConcurrency() { - return transactionConcurrency; - } - - /** - * Sets transaction concurrency level. - * - * @param transactionConcurrency transaction concurrency level. - */ - public void setTransactionConcurrency(TransactionConcurrency transactionConcurrency) { - this.transactionConcurrency = transactionConcurrency; - } + /** Spring context */ + private ApplicationContext springCtx; /** * Gets configuration file path. @@ -342,220 +291,69 @@ public void setIgniteInstanceName(String igniteInstanceName) { } /** {@inheritDoc} */ - @Override public void onApplicationEvent(ContextRefreshedEvent event) { + @Override public void onApplicationEvent(ContextRefreshedEvent evt) { if (ignite == null) { if (cfgPath != null && cfg != null) { throw new IllegalArgumentException("Both 'configurationPath' and 'configuration' are " + "provided. Set only one of these properties if you need to start a Ignite node inside of " + - "SpringCacheManager. If you already have a node running, omit both of them and set" + + "SpringTransactionManager. If you already have a node running, omit both of them and set" + "'igniteInstanceName' property."); } try { - if (cfgPath != null) { + if (cfgPath != null) ignite = IgniteSpring.start(cfgPath, springCtx); - } else if (cfg != null) ignite = IgniteSpring.start(cfg, springCtx); - else + else { ignite = Ignition.ignite(igniteInstanceName); + + externalIgniteInstance = true; + } } catch (IgniteCheckedException e) { throw U.convertException(e); } - } - - if (transactionConcurrency == null) - transactionConcurrency = ignite.configuration().getTransactionConfiguration().getDefaultTxConcurrency(); - - log = ignite.log(); - } - - /** {@inheritDoc} */ - @Override protected Object doGetTransaction() throws TransactionException { - IgniteTransactionObject txObj = new IgniteTransactionObject(); - - txObj.setTransactionHolder( - (IgniteTransactionHolder)TransactionSynchronizationManager.getResource(this.ignite), false); - - return txObj; - } - - /** {@inheritDoc} */ - @Override protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { - if (definition.getIsolationLevel() == TransactionDefinition.ISOLATION_READ_UNCOMMITTED) - throw new InvalidIsolationLevelException("Ignite does not support READ_UNCOMMITTED isolation level."); - - IgniteTransactionObject txObj = (IgniteTransactionObject)transaction; - Transaction tx = null; - - try { - if (txObj.getTransactionHolder() == null || txObj.getTransactionHolder().isSynchronizedWithTransaction()) { - long timeout = ignite.configuration().getTransactionConfiguration().getDefaultTxTimeout(); - - if (definition.getTimeout() > 0) - timeout = TimeUnit.SECONDS.toMillis(definition.getTimeout()); - - Transaction newTx = ignite.transactions().txStart(transactionConcurrency, - convertToIgniteIsolationLevel(definition.getIsolationLevel()), timeout, 0); - - if (log.isDebugEnabled()) - log.debug("Started Ignite transaction: " + newTx); - - txObj.setTransactionHolder(new IgniteTransactionHolder(newTx), true); - } - - txObj.getTransactionHolder().setSynchronizedWithTransaction(true); - txObj.getTransactionHolder().setTransactionActive(true); - - tx = txObj.getTransactionHolder().getTransaction(); - // Bind the session holder to the thread. - if (txObj.isNewTransactionHolder()) - TransactionSynchronizationManager.bindResource(this.ignite, txObj.getTransactionHolder()); + txCfg = ignite.configuration().getTransactionConfiguration(); } - catch (Exception ex) { - if (tx != null) - tx.close(); - throw new CannotCreateTransactionException("Could not create Ignite transaction", ex); - } + super.onApplicationEvent(evt); } /** {@inheritDoc} */ - @Override protected void doCommit(DefaultTransactionStatus status) throws TransactionException { - IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction(); - Transaction tx = txObj.getTransactionHolder().getTransaction(); - - if (status.isDebug() && log.isDebugEnabled()) - log.debug("Committing Ignite transaction: " + tx); - - try { - tx.commit(); - } - catch (IgniteException e) { - throw new TransactionSystemException("Could not commit Ignite transaction", e); - } + @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { + this.springCtx = ctx; } /** {@inheritDoc} */ - @Override protected void doRollback(DefaultTransactionStatus status) throws TransactionException { - IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction(); - Transaction tx = txObj.getTransactionHolder().getTransaction(); - - if (status.isDebug() && log.isDebugEnabled()) - log.debug("Rolling back Ignite transaction: " + tx); - - try { - tx.rollback(); - } - catch (IgniteException e) { - throw new TransactionSystemException("Could not rollback Ignite transaction", e); - } + @Override protected TransactionIsolation defaultTransactionIsolation() { + return txCfg.getDefaultTxIsolation(); } /** {@inheritDoc} */ - @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException { - IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction(); - Transaction tx = txObj.getTransactionHolder().getTransaction(); - - assert tx != null; - - if (status.isDebug() && log.isDebugEnabled()) - log.debug("Setting Ignite transaction rollback-only: " + tx); - - tx.setRollbackOnly(); + @Override protected long defaultTransactionTimeout() { + return txCfg.getDefaultTxTimeout(); } /** {@inheritDoc} */ - @Override protected void doCleanupAfterCompletion(Object transaction) { - IgniteTransactionObject txObj = (IgniteTransactionObject)transaction; - - // Remove the transaction holder from the thread, if exposed. - if (txObj.isNewTransactionHolder()) { - Transaction tx = txObj.getTransactionHolder().getTransaction(); - TransactionSynchronizationManager.unbindResource(this.ignite); - - if (log.isDebugEnabled()) - log.debug("Releasing Ignite transaction: " + tx); - } - - txObj.getTransactionHolder().clear(); + @Override protected IgniteLogger log() { + return ignite.log(); } /** {@inheritDoc} */ - @Override protected boolean isExistingTransaction(Object transaction) throws TransactionException { - IgniteTransactionObject txObj = (IgniteTransactionObject)transaction; - - return (txObj.getTransactionHolder() != null && txObj.getTransactionHolder().isTransactionActive()); + @Override protected TransactionConcurrency defaultTransactionConcurrency() { + return txCfg.getDefaultTxConcurrency(); } /** {@inheritDoc} */ - @Override public Object getResourceFactory() { - return this.ignite; - } - - /** - * @param isolationLevel Spring isolation level. - * @return Ignite isolation level. - */ - private TransactionIsolation convertToIgniteIsolationLevel(int isolationLevel) { - TransactionIsolation isolation = ignite.configuration().getTransactionConfiguration().getDefaultTxIsolation(); - switch (isolationLevel) { - case TransactionDefinition.ISOLATION_READ_COMMITTED: - isolation = TransactionIsolation.READ_COMMITTED; - break; - case TransactionDefinition.ISOLATION_REPEATABLE_READ: - isolation = TransactionIsolation.REPEATABLE_READ; - break; - case TransactionDefinition.ISOLATION_SERIALIZABLE: - isolation = TransactionIsolation.SERIALIZABLE; - } - return isolation; + @Override protected TransactionProxyFactory createTransactionFactory() { + return new IgniteTransactionProxyFactory(ignite.transactions()); } - /** - * An object representing a managed Ignite transaction. - */ - private static class IgniteTransactionObject { - /** */ - private IgniteTransactionHolder transactionHolder; - - /** */ - private boolean newTransactionHolder; - - /** - * Sets the resource holder being used to hold Ignite resources in the - * transaction. - * - * @param transactionHolder the transaction resource holder - * @param newHolder true if the holder was created for this transaction, - * false if it already existed - */ - private void setTransactionHolder(IgniteTransactionHolder transactionHolder, boolean newHolder) { - this.transactionHolder = transactionHolder; - this.newTransactionHolder = newHolder; - } - - /** - * Returns the resource holder being used to hold Ignite resources in the - * transaction. - * - * @return the transaction resource holder - */ - private IgniteTransactionHolder getTransactionHolder() { - return transactionHolder; - } - - /** - * Returns true if the transaction holder was created for the current - * transaction and false if it existed prior to the transaction. - * - * @return true if the holder was created for this transaction, false if it - * already existed - */ - private boolean isNewTransactionHolder() { - return newTransactionHolder; - } + /** {@inheritDoc} */ + @Override public void destroy() { + if (!externalIgniteInstance) + ignite.close(); } } diff --git a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java index c259db8ce1bc3..312ae5c39d4d0 100644 --- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java +++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java @@ -44,6 +44,7 @@ import org.apache.ignite.startup.cmdline.GridCommandLineLoaderTest; import org.apache.ignite.transactions.spring.GridSpringTransactionManagerSelfTest; import org.apache.ignite.transactions.spring.GridSpringTransactionManagerSpringBeanSelfTest; +import org.apache.ignite.transactions.spring.IgniteClientSpringTransactionManagerTest; import org.apache.ignite.transactions.spring.SpringTransactionManagerContextInjectionTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -80,6 +81,7 @@ GridSpringTransactionManagerSelfTest.class, GridSpringTransactionManagerSpringBeanSelfTest.class, + IgniteClientSpringTransactionManagerTest.class, GridServiceInjectionSpringResourceTest.class, IgniteSpringBeanSpringResourceInjectionTest.class, diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java index e40b2bc96741f..eb7cd78235ea0 100644 --- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerAbstractTest.java @@ -17,9 +17,9 @@ package org.apache.ignite.transactions.spring; -import org.apache.ignite.IgniteCache; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy; import org.junit.Test; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.InvalidIsolationLevelException; @@ -33,7 +33,7 @@ public abstract class GridSpringTransactionManagerAbstractTest extends GridCommo protected static final String CACHE_NAME = "testCache"; /** */ - public abstract IgniteCache cache(); + public abstract CacheProxy cache(); /** */ public abstract GridSpringTransactionService service(); diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java index 1f85f5a253428..d5e2a8e3ac981 100644 --- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java @@ -17,10 +17,11 @@ package org.apache.ignite.transactions.spring; -import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy; +import org.apache.ignite.transactions.spring.GridSpringTransactionService.IgniteCacheProxy; import org.springframework.context.ApplicationContext; import org.springframework.context.support.GenericXmlApplicationContext; @@ -45,8 +46,8 @@ public class GridSpringTransactionManagerSelfTest extends GridSpringTransactionM return cfg; } - @Override public IgniteCache cache() { - return grid().cache(CACHE_NAME); + @Override public CacheProxy cache() { + return new IgniteCacheProxy<>(grid().cache(CACHE_NAME)); } @Override public GridSpringTransactionService service() { diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java index 6ed14cf4d4a7b..19774a698d58c 100644 --- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSpringBeanSelfTest.java @@ -18,7 +18,8 @@ package org.apache.ignite.transactions.spring; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; +import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy; +import org.apache.ignite.transactions.spring.GridSpringTransactionService.IgniteCacheProxy; import org.springframework.context.ApplicationContext; import org.springframework.context.support.GenericXmlApplicationContext; @@ -30,8 +31,8 @@ public class GridSpringTransactionManagerSpringBeanSelfTest extends GridSpringTr /** */ private GridSpringTransactionService service; - @Override public IgniteCache cache() { - return ignite.cache(CACHE_NAME); + @Override public CacheProxy cache() { + return new IgniteCacheProxy<>(ignite.cache(CACHE_NAME)); } @Override public GridSpringTransactionService service() { diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java index 1a4c2b6ebc8ab..4a0d02f5aad7e 100644 --- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java +++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionService.java @@ -18,6 +18,8 @@ package org.apache.ignite.transactions.spring; import org.apache.ignite.IgniteCache; +import org.apache.ignite.client.ClientCache; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -26,12 +28,16 @@ * Service. */ public class GridSpringTransactionService { + /** */ + @Autowired + private GridSpringTransactionService self; + /** * @param cache Cache. * @param entryCnt Entries count. */ @Transactional - public void put(IgniteCache cache, int entryCnt) { + public void put(CacheProxy cache, int entryCnt) { for (int i = 0; i < entryCnt; i++) cache.put(i, String.valueOf(i)); } @@ -41,7 +47,7 @@ public void put(IgniteCache cache, int entryCnt) { * @param entryCnt Entries count. */ @Transactional - public void putWithError(IgniteCache cache, int entryCnt) { + public void putWithError(CacheProxy cache, int entryCnt) { for (int i = 0; i < entryCnt; i++) cache.put(i, String.valueOf(i)); @@ -52,7 +58,7 @@ public void putWithError(IgniteCache cache, int entryCnt) { * @param cache Cache. */ @Transactional(propagation = Propagation.MANDATORY) - public void putWithMandatoryPropagation(IgniteCache cache) { + public void putWithMandatoryPropagation(CacheProxy cache) { cache.put(1, "1"); } @@ -60,7 +66,84 @@ public void putWithMandatoryPropagation(IgniteCache cache) { * @param cache Cache. */ @Transactional(isolation = Isolation.READ_UNCOMMITTED) - public void putWithUnsupportedIsolationLevel(IgniteCache cache) { + public void putWithUnsupportedIsolationLevel(CacheProxy cache) { cache.put(1, "1"); } + + /** */ + @Transactional + public void putWithNestedError(CacheProxy cache, int entryCnt) { + self.put(cache, entryCnt); + + try { + self.putWithError(cache, entryCnt); + } + catch (Exception ignored) { + // No-op. + } + } + + /** */ + public static class ClientCacheProxy implements CacheProxy { + /** */ + private final ClientCache cliCache; + + /** */ + public ClientCacheProxy(ClientCache cliCache) { + this.cliCache = cliCache; + } + + /** {@inheritDoc} */ + @Override public void put(K key, V val) { + cliCache.put(key, val); + } + + /** {@inheritDoc} */ + @Override public int size() { + return cliCache.size(); + } + + /** {@inheritDoc} */ + @Override public void removeAll() { + cliCache.removeAll(); + } + } + + /** */ + public static class IgniteCacheProxy implements CacheProxy { + /** */ + private final IgniteCache cache; + + /** */ + public IgniteCacheProxy(IgniteCache cache) { + this.cache = cache; + } + + /** {@inheritDoc} */ + @Override public void put(K key, V val) { + cache.put(key, val); + } + + /** {@inheritDoc} */ + @Override public int size() { + return cache.size(); + } + + /** {@inheritDoc} */ + @Override public void removeAll() { + cache.removeAll(); + } + } + + /** */ + public static interface CacheProxy { + /** */ + public void put(K key, V val); + + /** */ + public int size(); + + /** */ + public void removeAll(); + } } diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManagerTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManagerTest.java new file mode 100644 index 0000000000000..0d333eaf2c1b8 --- /dev/null +++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/IgniteClientSpringTransactionManagerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.transactions.spring; + +import org.apache.ignite.Ignition; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.spring.GridSpringTransactionService.CacheProxy; +import org.apache.ignite.transactions.spring.GridSpringTransactionService.ClientCacheProxy; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.UnexpectedRollbackException; +import org.springframework.transaction.annotation.EnableTransactionManagement; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT; + +/** Tests Spring Transactions manager implementation that uses thin client to access the Ignite cluster. */ +public class IgniteClientSpringTransactionManagerTest extends GridSpringTransactionManagerAbstractTest { + /** Spring application context. */ + private static AnnotationConfigApplicationContext ctx; + + /** Ignite thin client instance. */ + private static IgniteClient cli; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCacheConfiguration(new CacheConfiguration<>(CACHE_NAME) + .setAtomicityMode(TRANSACTIONAL)); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + + ctx = new AnnotationConfigApplicationContext(IgniteClientSpringTransactionManagerApplicationContext.class); + cli = ctx.getBean(IgniteClient.class); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + ctx.close(); + } + + /** {@inheritDoc} */ + @Override public CacheProxy cache() { + return new ClientCacheProxy<>(cli.cache(CACHE_NAME)); + } + + /** {@inheritDoc} */ + @Override public GridSpringTransactionService service() { + return ctx.getBean(GridSpringTransactionService.class); + } + + /** {@inheritDoc} */ + @Override public void testDoSetRollbackOnlyInExistingTransaction() { + GridTestUtils.assertThrowsAnyCause( + log, + () -> { + service().putWithNestedError(cache(), 1_000); + + return null; + }, + UnexpectedRollbackException.class, + "Transaction rolled back because it has been marked as rollback-only"); + + assertEquals(0, cache().size()); + } + + /** */ + @Configuration + @EnableTransactionManagement + public static class IgniteClientSpringTransactionManagerApplicationContext { + /** */ + @Bean + public GridSpringTransactionService transactionService() { + return new GridSpringTransactionService(); + } + + /** */ + @Bean + public IgniteClient igniteClient() { + return Ignition.startClient(new ClientConfiguration().setAddresses("127.0.0.1:" + DFLT_PORT)); + } + + /** */ + @Bean + public AbstractSpringTransactionManager transactionManager(IgniteClient cli) { + IgniteClientSpringTransactionManager mgr = new IgniteClientSpringTransactionManager(); + + mgr.setClientInstance(cli); + + return mgr; + } + } +}