diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-12-08 15:00:07 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-12-08 15:00:07 +0100 |
commit | 4f11c490da2bee48a40e80cfbbfda9914457043c (patch) | |
tree | f2f784afe2c80bc04b2b32b5b588ac65f5643226 /container-core | |
parent | 92fb7c5a6b124a9f384ed54cb5be166c14a5e910 (diff) |
Deprecate methods using Guava ListenableFuture
- com.yahoo.processing.Response.recursiveComplete()
- com.yahoo.processing.response.DataList.complete()
- com.yahoo.processing.response.IncomingData.completed()
Also fixes bug in FutureResponse where `get()` does not return value produced by async task.
Diffstat (limited to 'container-core')
15 files changed, 166 insertions, 87 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json index cdd6da944c5..27fa885ada4 100644 --- a/container-core/abi-spec.json +++ b/container-core/abi-spec.json @@ -2839,6 +2839,7 @@ "public void <init>(com.yahoo.processing.Request, com.yahoo.processing.request.ErrorMessage)", "public void mergeWith(com.yahoo.processing.Response)", "public com.yahoo.processing.response.DataList data()", + "public static java.util.concurrent.CompletableFuture recursiveFuture(com.yahoo.processing.response.DataList)", "public static com.google.common.util.concurrent.ListenableFuture recursiveComplete(com.yahoo.processing.response.DataList)" ], "fields": [] @@ -3390,7 +3391,7 @@ "fields": [] }, "com.yahoo.processing.response.AbstractDataList$DrainOnGetFuture": { - "superClass": "com.google.common.util.concurrent.AbstractFuture", + "superClass": "com.yahoo.processing.impl.ProcessingFuture", "interfaces": [], "attributes": [ "public", @@ -3402,8 +3403,8 @@ "public boolean isCancelled()", "public com.yahoo.processing.response.DataList get()", "public com.yahoo.processing.response.DataList get(long, java.util.concurrent.TimeUnit)", - "public bridge synthetic java.lang.Object get()", - "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)" + "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)", + "public bridge synthetic java.lang.Object get()" ], "fields": [] }, @@ -3425,6 +3426,7 @@ "public com.yahoo.processing.Request request()", "public com.yahoo.processing.response.IncomingData incoming()", "public com.google.common.util.concurrent.ListenableFuture complete()", + "public java.util.concurrent.CompletableFuture future()", "public boolean isOrdered()", "public boolean isStreamed()", "public java.lang.String toString()" @@ -3483,6 +3485,7 @@ "public abstract com.yahoo.processing.response.Data get(int)", "public abstract java.util.List asList()", "public abstract com.yahoo.processing.response.IncomingData incoming()", + "public abstract java.util.concurrent.CompletableFuture future()", "public abstract com.google.common.util.concurrent.ListenableFuture complete()", "public abstract void addDataListener(java.lang.Runnable)", "public void close()" @@ -3503,6 +3506,7 @@ "public final void assignOwner(com.yahoo.processing.response.DataList)", "public com.yahoo.processing.response.DataList getOwner()", "public com.google.common.util.concurrent.ListenableFuture completed()", + "public java.util.concurrent.CompletableFuture future()", "public synchronized boolean isComplete()", "public synchronized void addLast(com.yahoo.processing.response.Data)", "public synchronized void add(com.yahoo.processing.response.Data)", @@ -3516,26 +3520,29 @@ "fields": [] }, "com.yahoo.processing.response.FutureResponse": { - "superClass": "com.google.common.util.concurrent.ForwardingFuture", - "interfaces": [], + "superClass": "java.lang.Object", + "interfaces": [ + "java.util.concurrent.Future" + ], "attributes": [ "public" ], "methods": [ "public void <init>(java.util.concurrent.Callable, com.yahoo.processing.execution.Execution, com.yahoo.processing.Request)", - "public com.google.common.util.concurrent.ListenableFutureTask delegate()", + "public java.util.concurrent.FutureTask delegate()", + "public boolean cancel(boolean)", + "public boolean isCancelled()", + "public boolean isDone()", "public com.yahoo.processing.Response get()", "public com.yahoo.processing.Response get(long, java.util.concurrent.TimeUnit)", "public com.yahoo.processing.Request getRequest()", "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)", - "public bridge synthetic java.lang.Object get()", - "public bridge synthetic java.util.concurrent.Future delegate()", - "public bridge synthetic java.lang.Object delegate()" + "public bridge synthetic java.lang.Object get()" ], "fields": [] }, "com.yahoo.processing.response.IncomingData$NullIncomingData$ImmediateFuture": { - "superClass": "com.google.common.util.concurrent.AbstractFuture", + "superClass": "com.yahoo.processing.impl.ProcessingFuture", "interfaces": [], "attributes": [ "public" @@ -3547,8 +3554,8 @@ "public boolean isDone()", "public com.yahoo.processing.response.DataList get()", "public com.yahoo.processing.response.DataList get(long, java.util.concurrent.TimeUnit)", - "public bridge synthetic java.lang.Object get()", - "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)" + "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)", + "public bridge synthetic java.lang.Object get()" ], "fields": [] }, @@ -3564,6 +3571,7 @@ "methods": [ "public void <init>(com.yahoo.processing.response.DataList)", "public com.google.common.util.concurrent.ListenableFuture completed()", + "public java.util.concurrent.CompletableFuture future()", "public com.yahoo.processing.response.DataList getOwner()", "public boolean isComplete()", "public void addLast(com.yahoo.processing.response.Data)", @@ -3587,6 +3595,7 @@ ], "methods": [ "public abstract com.yahoo.processing.response.DataList getOwner()", + "public abstract java.util.concurrent.CompletableFuture future()", "public abstract com.google.common.util.concurrent.ListenableFuture completed()", "public abstract boolean isComplete()", "public abstract void addLast(com.yahoo.processing.response.Data)", diff --git a/container-core/src/main/java/com/yahoo/processing/Response.java b/container-core/src/main/java/com/yahoo/processing/Response.java index 0319a36f2f8..9ab95901320 100644 --- a/container-core/src/main/java/com/yahoo/processing/Response.java +++ b/container-core/src/main/java/com/yahoo/processing/Response.java @@ -1,12 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing; -import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.concurrent.CompletableFutures; import com.yahoo.concurrent.SystemTimer; import com.yahoo.processing.execution.ResponseReceiver; +import com.yahoo.processing.impl.ProcessingFuture; import com.yahoo.processing.request.CompoundName; import com.yahoo.processing.request.ErrorMessage; import com.yahoo.processing.response.ArrayDataList; @@ -15,8 +15,8 @@ import com.yahoo.processing.response.DataList; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,7 +57,7 @@ public class Response extends ListenableFreezableClass { if (freezeListener != null) { if (freezeListener instanceof ResponseReceiver) ((ResponseReceiver)freezeListener).setResponse(this); - data.addFreezeListener(freezeListener, MoreExecutors.directExecutor()); + data.addFreezeListener(freezeListener, Runnable::run); } } @@ -96,15 +96,22 @@ public class Response extends ListenableFreezableClass { * @param rootDataList the list to complete recursively * @return the future in which all data in and below this list is complete, as the given root dataList for convenience */ - public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) { - List<ListenableFuture<DataList<D>>> futures = new ArrayList<>(); + public static <D extends Data> CompletableFuture<DataList<D>> recursiveFuture(DataList<D> rootDataList) { + List<CompletableFuture<DataList<D>>> futures = new ArrayList<>(); collectCompletionFutures(rootDataList, futures); return new CompleteAllOnGetFuture<D>(futures); } + /** @deprecated Use {@link #recursiveFuture(DataList)} instead */ + @Deprecated(forRemoval = true, since = "7") + @SuppressWarnings("removal") + public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) { + return CompletableFutures.toGuavaListenableFuture(recursiveFuture(rootDataList)); + } + @SuppressWarnings("unchecked") - private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<ListenableFuture<DataList<D>>> futures) { - futures.add(dataList.complete()); + private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<CompletableFuture<DataList<D>>> futures) { + futures.add(dataList.future()); for (D data : dataList.asList()) { if (data instanceof DataList) collectCompletionFutures((DataList<D>) data, futures); @@ -115,24 +122,24 @@ public class Response extends ListenableFreezableClass { * A future which on get calls get on all its given futures and sets the value returned from the * first given future as its result. */ - private static class CompleteAllOnGetFuture<D extends Data> extends AbstractFuture<DataList<D>> { + private static class CompleteAllOnGetFuture<D extends Data> extends ProcessingFuture<DataList<D>> { - private final List<ListenableFuture<DataList<D>>> futures; + private final List<CompletableFuture<DataList<D>>> futures; - public CompleteAllOnGetFuture(List<ListenableFuture<DataList<D>>> futures) { + public CompleteAllOnGetFuture(List<CompletableFuture<DataList<D>>> futures) { this.futures = new ArrayList<>(futures); } @Override public DataList<D> get() throws InterruptedException, ExecutionException { DataList<D> result = null; - for (ListenableFuture<DataList<D>> future : futures) { + for (CompletableFuture<DataList<D>> future : futures) { if (result == null) result = future.get(); else future.get(); } - set(result); + complete(result); return result; } @@ -141,7 +148,7 @@ public class Response extends ListenableFreezableClass { DataList<D> result = null; long timeLeft = unit.toMillis(timeout); long currentCallStart = SystemTimer.INSTANCE.milliTime(); - for (ListenableFuture<DataList<D>> future : futures) { + for (CompletableFuture<DataList<D>> future : futures) { if (result == null) result = future.get(timeLeft, TimeUnit.MILLISECONDS); else @@ -151,7 +158,7 @@ public class Response extends ListenableFreezableClass { if (timeLeft <= 0) break; currentCallStart = currentCallEnd; } - set(result); + complete(result); return result; } diff --git a/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java b/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java new file mode 100644 index 00000000000..ab597fffaff --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java @@ -0,0 +1,31 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.impl; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A {@link CompletableFuture} where {@link #get()}/{@link #get(long, TimeUnit)} may have side-effects (e.g trigger the underlying computation). + * + * @author bjorncs + */ +// TODO Vespa 8 remove ListenableFuture implementation +public abstract class ProcessingFuture<V> extends CompletableFuture<V> implements ListenableFuture<V> { + + @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } + @Override public boolean isCancelled() { return false; } + + @Override public abstract V get() throws InterruptedException, ExecutionException; + @Override public abstract V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; + + @Override + public void addListener(Runnable listener, Executor executor) { + whenCompleteAsync((__, ___) -> listener.run(), executor); + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java index 21375ee3d76..92df00711f3 100644 --- a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java +++ b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java @@ -2,7 +2,6 @@ package com.yahoo.processing.rendering; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.concurrent.CompletableFutures; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.jdisc.handler.CompletionHandler; @@ -257,7 +256,7 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e * inadvertently work ends up in async data producing threads in some cases. */ Executor getExecutor() { - return beforeHandoverMode ? MoreExecutors.directExecutor() : renderingExecutor; + return beforeHandoverMode ? Runnable::run : renderingExecutor; } /** For inspection only; use getExecutor() for execution */ Executor getRenderingExecutor() { return renderingExecutor; } @@ -360,10 +359,10 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e return; // Called on completion of a list which is not frozen yet - hold off until frozen if ( ! beforeHandoverMode) - list.complete().get(); // trigger completion if not done already to invoke any listeners on that event + list.future().get(); // trigger completion if not done already to invoke any listeners on that event boolean startedRendering = renderData(); if ( ! startedRendering || uncompletedChildren > 0) return; // children must render to completion first - if (list.complete().isDone()) // might not be when in before handover mode + if (list.future().isDone()) // might not be when in before handover mode endListLevel(); else stream.flush(); @@ -445,8 +444,8 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e flushIfLikelyToSuspend(subList); subList.addFreezeListener(listListener, getExecutor()); - subList.complete().addListener(listListener, getExecutor()); - subList.incoming().completed().addListener(listListener, getExecutor()); + subList.future().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor()); + subList.incoming().future().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor()); } private boolean isOrdered(DataList dataList) { diff --git a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java index 4633ac5ec1c..ef7aea4716f 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java +++ b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java @@ -1,15 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.concurrent.CompletableFutures; import com.yahoo.processing.Request; +import com.yahoo.processing.impl.ProcessingFuture; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -34,7 +32,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable */ private final IncomingData<DATATYPE> incomingData; - private final ListenableFuture<DataList<DATATYPE>> completedFuture; + private final CompletableFuture<DataList<DATATYPE>> completedFuture; /** * Creates a simple data list which does not allow late incoming data @@ -94,10 +92,15 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable return incomingData; } + @Override + @SuppressWarnings("removal") + @Deprecated(forRemoval = true, since = "7") public ListenableFuture<DataList<DATATYPE>> complete() { - return completedFuture; + return CompletableFutures.toGuavaListenableFuture(completedFuture); } + @Override public CompletableFuture<DataList<DATATYPE>> future() { return completedFuture; } + @Override public boolean isOrdered() { return ordered; } @@ -108,7 +111,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]"); } - public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + public static final class DrainOnGetFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> { private final DataList<DATATYPE> owner; @@ -137,7 +140,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable */ @Override public DataList<DATATYPE> get() throws InterruptedException, ExecutionException { - return drain(owner.incoming().completed().get()); + return drain(owner.incoming().future().get()); } /** @@ -146,13 +149,13 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable */ @Override public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - return drain(owner.incoming().completed().get(timeout, timeUnit)); + return drain(owner.incoming().future().get(timeout, timeUnit)); } private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) { for (DATATYPE item : dataList.incoming().drain()) dataList.add(item); - set(dataList); // Signal completion to listeners + complete(dataList); // Signal completion to listeners return dataList; } diff --git a/container-core/src/main/java/com/yahoo/processing/response/DataList.java b/container-core/src/main/java/com/yahoo/processing/response/DataList.java index d566e201375..d30fe4f1a0e 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/DataList.java +++ b/container-core/src/main/java/com/yahoo/processing/response/DataList.java @@ -1,11 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.CompletableFuture; /** * A list of data items created due to a processing request. @@ -73,6 +72,10 @@ public interface DataList<DATATYPE extends Data> extends Data { * Making this call on a list which does not support future data always returns immediately and * causes no memory synchronization cost. */ + CompletableFuture<DataList<DATATYPE>> future(); + + /** @deprecated Use {@link #future()} instead */ + @Deprecated(forRemoval = true, since = "7") ListenableFuture<DataList<DATATYPE>> complete(); /** diff --git a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java index 619e554f45c..0a2bcdc90b0 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java +++ b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java @@ -2,12 +2,13 @@ package com.yahoo.processing.response; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.yahoo.collections.Tuple2; +import com.yahoo.concurrent.CompletableFutures; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** @@ -19,7 +20,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< private DataList<DATATYPE> owner = null; - private final SettableFuture<DataList<DATATYPE>> completionFuture; + private final CompletableFuture<DataList<DATATYPE>> completionFuture; private final List<DATATYPE> dataList = new ArrayList<>(); @@ -35,7 +36,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< public DefaultIncomingData(DataList<DATATYPE> owner) { assignOwner(owner); - completionFuture = SettableFuture.create(); + completionFuture = new CompletableFuture<>(); } /** Assigns the owner of this. Throws an exception if the owner is already set. */ @@ -50,10 +51,14 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< } @Override + @Deprecated(forRemoval = true, since = "7") + @SuppressWarnings("removal") public ListenableFuture<DataList<DATATYPE>> completed() { - return completionFuture; + return CompletableFutures.toGuavaListenableFuture(completionFuture); } + @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; } + /** Returns whether the data in this is complete */ @Override public synchronized boolean isComplete() { @@ -92,7 +97,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< @Override public synchronized void markComplete() { complete = true; - completionFuture.set(owner); + completionFuture.complete(owner); } /** diff --git a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java index d589b7dd195..25c230e383f 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java +++ b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.ForwardingFuture; -import com.google.common.util.concurrent.ListenableFutureTask; import com.yahoo.processing.Request; import com.yahoo.processing.Response; import com.yahoo.processing.execution.Execution; @@ -10,6 +8,8 @@ import com.yahoo.processing.request.ErrorMessage; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; @@ -20,9 +20,10 @@ import java.util.logging.Logger; * * @author bratseth */ -public class FutureResponse extends ForwardingFuture<Response> { +public class FutureResponse implements Future<Response> { private final Request request; + private final FutureTask<Response> task; /** * Only used for generating messages @@ -31,24 +32,23 @@ public class FutureResponse extends ForwardingFuture<Response> { private final static Logger log = Logger.getLogger(FutureResponse.class.getName()); - private final ListenableFutureTask<Response> futureTask; - public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) { - this.futureTask = ListenableFutureTask.create(callable); + this.task = new FutureTask<>(callable); this.request = request; this.execution = execution; } - @Override - public ListenableFutureTask<Response> delegate() { - return futureTask; - } + public FutureTask<Response> delegate() { return task; } + + @Override public boolean cancel(boolean mayInterruptIfRunning) { return task.cancel(mayInterruptIfRunning); } + @Override public boolean isCancelled() { return task.isCancelled(); } + @Override public boolean isDone() { return task.isDone(); } public @Override Response get() { try { - return super.get(); + return task.get(); } catch (InterruptedException e) { return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); } catch (ExecutionException e) { @@ -61,7 +61,7 @@ public class FutureResponse extends ForwardingFuture<Response> { @Override Response get(long timeout, TimeUnit timeunit) { try { - return super.get(timeout, timeunit); + return task.get(timeout, timeunit); } catch (InterruptedException e) { return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); } catch (ExecutionException e) { diff --git a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java index 371c1bca45f..c8a013fbda9 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java +++ b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java @@ -1,11 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.concurrent.CompletableFutures; +import com.yahoo.processing.impl.ProcessingFuture; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -35,6 +37,10 @@ public interface IncomingData<DATATYPE extends Data> { * <p> * This return the list owning this for convenience. */ + CompletableFuture<DataList<DATATYPE>> future(); + + /** @deprecated Use {@link #future()} instead */ + @Deprecated(forRemoval = true, since = "7") ListenableFuture<DataList<DATATYPE>> completed(); /** @@ -108,10 +114,15 @@ public interface IncomingData<DATATYPE extends Data> { completionFuture = new ImmediateFuture<>(owner); } + @Override + @SuppressWarnings("removal") + @Deprecated(forRemoval = true, since = "7") public ListenableFuture<DataList<DATATYPE>> completed() { - return completionFuture; + return CompletableFutures.toGuavaListenableFuture(completionFuture); } + @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; } + @Override public DataList<DATATYPE> getOwner() { return owner; @@ -178,13 +189,13 @@ public interface IncomingData<DATATYPE extends Data> { * This is semantically the same as Futures.immediateFuture but contrary to it, * this never causes any memory synchronization when accessed. */ - public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + public static class ImmediateFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> { - private DataList<DATATYPE> owner; + private final DataList<DATATYPE> owner; public ImmediateFuture(DataList<DATATYPE> owner) { this.owner = owner; // keep here to avoid memory synchronization for access - set(owner); // Signal completion (for future listeners) + complete(owner); // Signal completion (for future listeners) } @Override diff --git a/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java index aebbc3f538d..5a672b77762 100644 --- a/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java +++ b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.test; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import com.yahoo.component.chain.Chain; import com.yahoo.processing.Processor; import com.yahoo.processing.Request; @@ -15,6 +13,7 @@ import com.yahoo.processing.request.ErrorMessage; import com.yahoo.processing.response.*; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** @@ -288,7 +287,7 @@ public class ProcessorLibrary { private final boolean ordered, streamed; /** The incoming data this has created */ - public final SettableFuture<IncomingData> incomingData = SettableFuture.create(); + public final CompletableFuture<IncomingData> incomingData = new CompletableFuture<>(); /** Create an instance which returns ordered, streamable data */ public ListenableFutureDataSource() { this(true, true); } @@ -307,7 +306,7 @@ public class ProcessorLibrary { dataList = ArrayDataList.createAsyncNonstreamed(request); else dataList = ArrayDataList.createAsync(request); - incomingData.set(dataList.incoming()); + incomingData.complete(dataList.incoming()); return new Response(dataList); } @@ -317,12 +316,12 @@ public class ProcessorLibrary { public static class RequestCounter extends Processor { /** The incoming data this has created */ - public final SettableFuture<IncomingData> incomingData = SettableFuture.create(); + public final CompletableFuture<IncomingData> incomingData = new CompletableFuture<>(); @Override public Response process(Request request, Execution execution) { ArrayDataList dataList = ArrayDataList.createAsync(request); - incomingData.set(dataList.incoming()); + incomingData.complete(dataList.incoming()); return new Response(dataList); } @@ -354,7 +353,7 @@ public class ProcessorLibrary { // wait for other executions and merge the responses for (Response additionalResponse : AsyncExecution.waitForAll(futures, 1000)) { - additionalResponse.data().complete().get(); // block until we have all the data elements + additionalResponse.data().future().get(); // block until we have all the data elements for (Object item : additionalResponse.data().asList()) response.data().add((Data) item); response.mergeWith(additionalResponse); @@ -382,9 +381,10 @@ public class ProcessorLibrary { public Response process(Request request, Execution execution) { Response response = execution.process(request); // TODO: Consider for to best provide helpers for this - response.data().complete().addListener(new RunnableExecution(request, - new ExecutionWithResponse(asyncChain, response, execution)), - MoreExecutors.directExecutor()); + response.data().future().whenComplete( + (__, ___) -> + new RunnableExecution(request, new ExecutionWithResponse(asyncChain, response, execution)) + .run()); return response; } diff --git a/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java b/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java index 0f16aed3d0b..efcf608b6f0 100644 --- a/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java +++ b/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java @@ -22,7 +22,7 @@ public class ResponseTestCase { * Check the recursive toString printing along the way. * List variable names ends by numbers specifying the index of the list at each level. */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "removal"}) @Test public void testRecursiveCompletionAndToString() throws InterruptedException, ExecutionException { // create lists diff --git a/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java b/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java index 40e7384c745..2fb32271419 100644 --- a/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java +++ b/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java @@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals; public class FutureDataTestCase { /** Run a chain which ends in a processor which returns a response containing future data. */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "removal"}) @Test public void testFutureDataPassThrough() throws InterruptedException, ExecutionException, TimeoutException { // Set up @@ -52,7 +52,7 @@ public class FutureDataTestCase { } /** Federate to one source which returns data immediately and one who return future data */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "removal"}) @Test public void testFederateSyncAndAsyncData() throws InterruptedException, ExecutionException, TimeoutException { // Set up @@ -88,7 +88,7 @@ public class FutureDataTestCase { } /** Register a chain which will be called when some async data is available */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "removal"}) @Test public void testAsyncDataProcessing() throws InterruptedException, ExecutionException, TimeoutException { // Set up @@ -120,7 +120,7 @@ public class FutureDataTestCase { * When the first of the futures are done one additional chain is to be run. * When both are done another chain is to be run. */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "removal"}) @Test public void testAsyncDataProcessingOfFederatedResult() throws InterruptedException, ExecutionException, TimeoutException { // Set up diff --git a/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java b/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java index 1ebf01c5f33..bd1307ff77c 100644 --- a/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java +++ b/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java @@ -13,7 +13,6 @@ import com.yahoo.processing.test.ProcessorLibrary; import org.junit.Test; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -27,7 +26,7 @@ import static org.junit.Assert.assertEquals; public class StreamingTestCase { /** Tests adding a chain which is called every time new data is added to a data list */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "removal"}) @Test public void testStreamingData() throws InterruptedException, ExecutionException, TimeoutException { // Set up diff --git a/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java b/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java index 50864c8b034..52f0c457ce2 100644 --- a/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java +++ b/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -407,6 +408,7 @@ public class AsynchronousSectionedRendererTest { } @Override + @SuppressWarnings("removal") public ListenableFuture<DataList<StringData>> complete() { return new ListenableFuture<DataList<StringData>>() { @Override @@ -442,6 +444,11 @@ public class AsynchronousSectionedRendererTest { } @Override + public CompletableFuture<DataList<StringData>> future() { + return CompletableFuture.completedFuture(this); + } + + @Override public String getString() { return list.toString(); } diff --git a/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java b/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java index 67a6634b659..21731f7d714 100644 --- a/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java +++ b/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java @@ -3,8 +3,12 @@ package com.yahoo.processing.test.documentation; import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.component.chain.Chain; -import com.yahoo.processing.*; -import com.yahoo.processing.execution.*; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.execution.ExecutionWithResponse; +import com.yahoo.processing.execution.RunnableExecution; /** * A processor which registers a listener on the future completion of @@ -18,6 +22,7 @@ public class AsyncDataProcessingInitiator extends Processor { this.asyncChain=asyncChain; } + @SuppressWarnings({"removal"}) @Override public Response process(Request request, Execution execution) { Response response=execution.process(request); |