Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.*;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -194,26 +193,13 @@ public PlatformContext platformContext() {

/** {@inheritDoc} */
@Override public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception {
return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this);
return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this);
}

/** {@inheritDoc} */
@Override public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId)
throws Exception {
return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this);
}

/**
* Get current future with proper exception conversions.
*
* @return Future.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedException {
IgniteFutureImpl fut = (IgniteFutureImpl)currentFuture();

return fut.internalFuture();
return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this);
}

/**
Expand All @@ -222,8 +208,8 @@ protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedExcept
* @return current future.
* @throws IgniteCheckedException
*/
protected IgniteFuture currentFuture() throws IgniteCheckedException {
throw new IgniteCheckedException("Future listening is not supported in " + this.getClass());
protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
throw new IgniteCheckedException("Future listening is not supported in " + getClass());
}

/**
Expand All @@ -232,7 +218,7 @@ protected IgniteFuture currentFuture() throws IgniteCheckedException {
* @param opId Operation id.
* @return A custom writer for given op id.
*/
protected @Nullable PlatformFutureUtils.Writer futureWriter(int opId){
@Nullable protected PlatformFutureUtils.Writer futureWriter(int opId){
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.TextQuery;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
Expand All @@ -43,6 +44,7 @@
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -684,8 +686,8 @@ private static void writeError(BinaryRawWriterEx writer, Exception ex) {
}

/** <inheritDoc /> */
@Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
return cache.future();
@Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)cache.future()).internalFuture();
}

/** <inheritDoc /> */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@

package org.apache.ignite.internal.processors.platform.compute;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.internal.IgniteComputeImpl;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.utils.*;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.binary.BinaryObject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;

Expand Down Expand Up @@ -62,7 +66,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
private final IgniteComputeImpl compute;

/** Future for previous asynchronous operation. */
protected ThreadLocal<IgniteFuture<?>> curFut = new ThreadLocal<>();
protected ThreadLocal<IgniteInternalFuture> curFut = new ThreadLocal<>();
/**
* Constructor.
*
Expand Down Expand Up @@ -213,8 +217,8 @@ public void withNoFailover() {
}

/** <inheritDoc /> */
@Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
IgniteFuture<?> fut = curFut.get();
@Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
IgniteInternalFuture fut = curFut.get();

if (fut == null)
throw new IllegalStateException("Asynchronous operation not started.");
Expand Down Expand Up @@ -272,13 +276,7 @@ protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) {
Object res = compute0.execute(taskName, arg);

if (async) {
curFut.set(compute0.future().chain(new C1<IgniteFuture, Object>() {
private static final long serialVersionUID = 0L;

@Override public Object apply(IgniteFuture fut) {
return toBinary(fut.get());
}
}));
curFut.set(new ComputeConvertingFuture(compute0.future()));

return null;
}
Expand Down Expand Up @@ -327,4 +325,113 @@ protected IgniteCompute computeForTask(Collection<UUID> nodeIds) {
return nodeIds == null ? compute :
platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds));
}

/**
* Wraps ComputeTaskFuture as IgniteInternalFuture.
*/
protected class ComputeConvertingFuture implements IgniteInternalFuture {
/** */
private final IgniteInternalFuture fut;

/**
* Ctor.
*
* @param fut Future to wrap.
*/
public ComputeConvertingFuture(ComputeTaskFuture fut) {
this.fut = ((IgniteFutureImpl)fut).internalFuture();
}

/** {@inheritDoc} */
@Override public Object get() throws IgniteCheckedException {
return convertResult(fut.get());
}

/** {@inheritDoc} */
@Override public Object get(long timeout) throws IgniteCheckedException {
return convertResult(fut.get(timeout));
}

/** {@inheritDoc} */
@Override public Object get(long timeout, TimeUnit unit) throws IgniteCheckedException {
return convertResult(fut.get(timeout, unit));
}

/** {@inheritDoc} */
@Override public Object getUninterruptibly() throws IgniteCheckedException {
return convertResult(fut.get());
}

/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
return fut.cancel();
}

/** {@inheritDoc} */
@Override public boolean isDone() {
return fut.isDone();
}

/** {@inheritDoc} */
@Override public boolean isCancelled() {
return fut.isCancelled();
}

/** {@inheritDoc} */
@Override public long startTime() {
return fut.startTime();
}

/** {@inheritDoc} */
@Override public long duration() {
return fut.duration();
}

/** {@inheritDoc} */
@Override public void listen(final IgniteInClosure lsnr) {
fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
private static final long serialVersionUID = 0L;

@Override public void apply(IgniteInternalFuture fut0) {
lsnr.apply(ComputeConvertingFuture.this);
}
});
}

/** {@inheritDoc} */
@Override public IgniteInternalFuture chain(IgniteClosure doneCb) {
return null; // not supported
}

/** {@inheritDoc} */
@Override public Throwable error() {
try {
fut.get();
return null;
}
catch (Throwable e) {
return e;
}
}

/** {@inheritDoc} */
@Override public Object result() {
try {
return fut.get();
}
catch (Throwable ignored) {
return null;
}
}

/**
* Converts future result.
*
* @param obj Object to convert.
* @return Result.
*/
protected Object convertResult(Object obj) {
return toBinary(obj);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@

package org.apache.ignite.internal.processors.platform.events;

import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventAdapter;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;

/**
* Interop events.
*/
Expand Down Expand Up @@ -269,8 +271,8 @@ public boolean isEnabled(int type) {
}

/** <inheritDoc /> */
@Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
return events.future();
@Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)events.future()).internalFuture();
}

/** <inheritDoc /> */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;

import java.util.UUID;

Expand Down Expand Up @@ -160,7 +161,7 @@ public PlatformMessaging withAsync() {
}

/** <inheritDoc /> */
@Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
return messaging.future();
@Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)messaging.future()).internalFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.ignite.internal.processors.platform.services;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
Expand All @@ -31,12 +29,16 @@
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDescriptor;

import java.util.Collection;
import java.util.Map;
import java.util.UUID;

/**
* Interop services.
*/
Expand Down Expand Up @@ -269,7 +271,7 @@ public Object dotNetServiceProxy(String name, boolean sticky) {
}

/** <inheritDoc /> */
@Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
return services.future();
@Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
return ((IgniteFutureImpl)services.future()).internalFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ private static class BroadcastJob extends ComputeJobAdapter {

/** {@inheritDoc} */
@Nullable @Override public Object execute() {
try {
Thread.sleep(50); // Short sleep for cancellation tests.
}
catch (InterruptedException ignored) {
// No-op.
}

return ignite.cluster().localNode().id();
}
}
Expand Down
Loading