diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 7ffceef90d14b..0cd683d8da87e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -24,9 +24,8 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.*; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; -import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; import org.jetbrains.annotations.Nullable; /** @@ -194,26 +193,13 @@ public PlatformContext platformContext() { /** {@inheritDoc} */ @Override public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception { - return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this); + return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this); } /** {@inheritDoc} */ @Override public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId) throws Exception { - return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this); - } - - /** - * Get current future with proper exception conversions. - * - * @return Future. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) - protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedException { - IgniteFutureImpl fut = (IgniteFutureImpl)currentFuture(); - - return fut.internalFuture(); + return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this); } /** @@ -222,8 +208,8 @@ protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedExcept * @return current future. * @throws IgniteCheckedException */ - protected IgniteFuture currentFuture() throws IgniteCheckedException { - throw new IgniteCheckedException("Future listening is not supported in " + this.getClass()); + protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + throw new IgniteCheckedException("Future listening is not supported in " + getClass()); } /** @@ -232,7 +218,7 @@ protected IgniteFuture currentFuture() throws IgniteCheckedException { * @param opId Operation id. * @return A custom writer for given op id. */ - protected @Nullable PlatformFutureUtils.Writer futureWriter(int opId){ + @Nullable protected PlatformFutureUtils.Writer futureWriter(int opId){ return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index 2f7cab26a293b..8e7c51da760a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -28,6 +28,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.cache.CacheOperationContext; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.GridConcurrentFactory; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; @@ -684,8 +686,8 @@ private static void writeError(BinaryRawWriterEx writer, Exception ex) { } /** */ - @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { - return cache.future(); + @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + return ((IgniteFutureImpl)cache.future()).internalFuture(); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 1dad126649919..1c979bf09c212 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -17,12 +17,10 @@ package org.apache.ignite.internal.processors.platform.compute; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.internal.IgniteComputeImpl; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryObjectImpl; @@ -30,11 +28,17 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.utils.*; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.binary.BinaryObject; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; @@ -62,7 +66,7 @@ public class PlatformCompute extends PlatformAbstractTarget { private final IgniteComputeImpl compute; /** Future for previous asynchronous operation. */ - protected ThreadLocal> curFut = new ThreadLocal<>(); + protected ThreadLocal curFut = new ThreadLocal<>(); /** * Constructor. * @@ -213,8 +217,8 @@ public void withNoFailover() { } /** */ - @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { - IgniteFuture fut = curFut.get(); + @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + IgniteInternalFuture fut = curFut.get(); if (fut == null) throw new IllegalStateException("Asynchronous operation not started."); @@ -272,13 +276,7 @@ protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) { Object res = compute0.execute(taskName, arg); if (async) { - curFut.set(compute0.future().chain(new C1() { - private static final long serialVersionUID = 0L; - - @Override public Object apply(IgniteFuture fut) { - return toBinary(fut.get()); - } - })); + curFut.set(new ComputeConvertingFuture(compute0.future())); return null; } @@ -327,4 +325,113 @@ protected IgniteCompute computeForTask(Collection nodeIds) { return nodeIds == null ? compute : platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds)); } + + /** + * Wraps ComputeTaskFuture as IgniteInternalFuture. + */ + protected class ComputeConvertingFuture implements IgniteInternalFuture { + /** */ + private final IgniteInternalFuture fut; + + /** + * Ctor. + * + * @param fut Future to wrap. + */ + public ComputeConvertingFuture(ComputeTaskFuture fut) { + this.fut = ((IgniteFutureImpl)fut).internalFuture(); + } + + /** {@inheritDoc} */ + @Override public Object get() throws IgniteCheckedException { + return convertResult(fut.get()); + } + + /** {@inheritDoc} */ + @Override public Object get(long timeout) throws IgniteCheckedException { + return convertResult(fut.get(timeout)); + } + + /** {@inheritDoc} */ + @Override public Object get(long timeout, TimeUnit unit) throws IgniteCheckedException { + return convertResult(fut.get(timeout, unit)); + } + + /** {@inheritDoc} */ + @Override public Object getUninterruptibly() throws IgniteCheckedException { + return convertResult(fut.get()); + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { + return fut.cancel(); + } + + /** {@inheritDoc} */ + @Override public boolean isDone() { + return fut.isDone(); + } + + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return fut.isCancelled(); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return fut.startTime(); + } + + /** {@inheritDoc} */ + @Override public long duration() { + return fut.duration(); + } + + /** {@inheritDoc} */ + @Override public void listen(final IgniteInClosure lsnr) { + fut.listen(new IgniteInClosure() { + private static final long serialVersionUID = 0L; + + @Override public void apply(IgniteInternalFuture fut0) { + lsnr.apply(ComputeConvertingFuture.this); + } + }); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture chain(IgniteClosure doneCb) { + return null; // not supported + } + + /** {@inheritDoc} */ + @Override public Throwable error() { + try { + fut.get(); + return null; + } + catch (Throwable e) { + return e; + } + } + + /** {@inheritDoc} */ + @Override public Object result() { + try { + return fut.get(); + } + catch (Throwable ignored) { + return null; + } + } + + /** + * Converts future result. + * + * @param obj Object to convert. + * @return Result. + */ + protected Object convertResult(Object obj) { + return toBinary(obj); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index 9bf0a8d89cbd3..71708af559c1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -17,24 +17,26 @@ package org.apache.ignite.internal.processors.platform.events; -import java.util.Arrays; -import java.util.Collection; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteEvents; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventAdapter; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; -import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; + /** * Interop events. */ @@ -269,8 +271,8 @@ public boolean isEnabled(int type) { } /** */ - @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { - return events.future(); + @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + return ((IgniteFutureImpl)events.future()).internalFuture(); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java index 88ea3c82c7832..619fea771fed3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -19,13 +19,14 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import java.util.UUID; @@ -160,7 +161,7 @@ public PlatformMessaging withAsync() { } /** */ - @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { - return messaging.future(); + @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + return ((IgniteFutureImpl)messaging.future()).internalFuture(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 9676b6f01f478..963c72e6ca44d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.processors.platform.services; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteServices; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -31,12 +29,16 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure; import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; -import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + /** * Interop services. */ @@ -269,7 +271,7 @@ public Object dotNetServiceProxy(String name, boolean sticky) { } /** */ - @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { - return services.future(); + @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + return ((IgniteFutureImpl)services.future()).internalFuture(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java index c721e16a21b2d..7bcba3372e07c 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java @@ -67,6 +67,13 @@ private static class BroadcastJob extends ComputeJobAdapter { /** {@inheritDoc} */ @Nullable @Override public Object execute() { + try { + Thread.sleep(50); // Short sleep for cancellation tests. + } + catch (InterruptedException ignored) { + // No-op. + } + return ignite.cluster().localNode().id(); } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index 72c02101a370a..a247f633142a6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -86,6 +86,7 @@ + diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs new file mode 100644 index 0000000000000..bbd116916647d --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Compute +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Compute; + using NUnit.Framework; + + /// + /// Cancellation tests. + /// + public class CancellationTest : IgniteTestBase + { + public CancellationTest() + : base("config\\compute\\compute-grid1.xml", "config\\compute\\compute-grid2.xml") + { + // No-op. + } + + [Test] + public void TestTask() + { + TestTask((c, t) => c.ExecuteAsync(new Task(), t)); + TestTask((c, t) => c.ExecuteAsync(new Task(), 1, t)); + TestTask((c, t) => c.ExecuteAsync>>(typeof(Task), t)); + TestTask((c, t) => c.ExecuteAsync>>(typeof(Task), 1, t)); + } + + [Test] + public void TestJavaTask() + { + using (var cts = new CancellationTokenSource()) + { + var task = Compute.ExecuteJavaTaskAsync(ComputeApiTest.BroadcastTask, null, cts.Token); + + Assert.IsFalse(task.IsCanceled); + + cts.Cancel(); + + Assert.IsTrue(task.IsCanceled); + + // Pass cancelled token + Assert.IsTrue( + Compute.ExecuteJavaTaskAsync(ComputeApiTest.BroadcastTask, null, cts.Token).IsCanceled); + } + } + + [Test] + public void TestClosures() + { + TestClosure((c, t) => c.BroadcastAsync(new ComputeAction(), t)); + TestClosure((c, t) => c.AffinityRunAsync(null, 0, new ComputeAction(), t)); + TestClosure((c, t) => c.RunAsync(new ComputeAction(), t)); + TestClosure((c, t) => c.RunAsync(Enumerable.Range(1, 10).Select(x => new ComputeAction()), t)); + TestClosure((c, t) => c.CallAsync(new ComputeFunc(), t)); + TestClosure((c, t) => c.AffinityCallAsync(null, 0, new ComputeFunc(), t)); + TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), 10, t)); + TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), Enumerable.Range(1, 100), t)); + TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), Enumerable.Range(1, 100), new ComputeReducer(), t)); + } + + private void TestTask(Func runner) + { + Job.CancelCount = 0; + + TestClosure(runner); + + Assert.IsTrue(TestUtils.WaitForCondition(() => Job.CancelCount > 0, 5000)); + } + + private void TestClosure(Func runner) + { + using (var cts = new CancellationTokenSource()) + { + var task = runner(Compute, cts.Token); + + Assert.IsFalse(task.IsCanceled); + + cts.Cancel(); + + Assert.IsTrue(task.IsCanceled); + + // Pass cancelled token + Assert.IsTrue(runner(Compute, cts.Token).IsCanceled); + } + } + + private class Task : IComputeTask>> + { + public IDictionary, IClusterNode> Map(IList subgrid, object arg) + { + return Enumerable.Range(1, 100) + .SelectMany(x => subgrid) + .ToDictionary(x => (IComputeJob)new Job(), x => x); + } + + public ComputeJobResultPolicy OnResult(IComputeJobResult res, IList> rcvd) + { + return ComputeJobResultPolicy.Wait; + } + + public IList> Reduce(IList> results) + { + Assert.Fail("Reduce should not be called on a cancelled task."); + return results; + } + } + + [Serializable] + private class Job : IComputeJob + { + private static int _cancelCount; + + public static int CancelCount + { + get { return Thread.VolatileRead(ref _cancelCount); } + set { Thread.VolatileWrite(ref _cancelCount, value); } + } + + public int Execute() + { + Thread.Sleep(50); + return 1; + } + + public void Cancel() + { + Interlocked.Increment(ref _cancelCount); + } + } + + [Serializable] + private class ComputeBiFunc : IComputeFunc + { + public int Invoke(int arg) + { + Thread.Sleep(50); + return arg; + } + } + + private class ComputeReducer : IComputeReducer + { + public bool Collect(int res) + { + return true; + } + + public int Reduce() + { + return 0; + } + } + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs index fe7d78f689d2e..26696b9424702 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs @@ -43,7 +43,7 @@ public class ComputeApiTest private const string BinaryArgTask = "org.apache.ignite.platform.PlatformComputeBinarizableArgTask"; /** Broadcast task name. */ - private const string BroadcastTask = "org.apache.ignite.platform.PlatformComputeBroadcastTask"; + public const string BroadcastTask = "org.apache.ignite.platform.PlatformComputeBroadcastTask"; /** Broadcast task name. */ private const string DecimalTask = "org.apache.ignite.platform.PlatformComputeDecimalTask";