aboutsummaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-12-08 15:00:07 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-12-08 15:00:07 +0100
commit4f11c490da2bee48a40e80cfbbfda9914457043c (patch)
treef2f784afe2c80bc04b2b32b5b588ac65f5643226 /container-core
parent92fb7c5a6b124a9f384ed54cb5be166c14a5e910 (diff)
Deprecate methods using Guava ListenableFuture
- com.yahoo.processing.Response.recursiveComplete() - com.yahoo.processing.response.DataList.complete() - com.yahoo.processing.response.IncomingData.completed() Also fixes bug in FutureResponse where `get()` does not return value produced by async task.
Diffstat (limited to 'container-core')
-rw-r--r--container-core/abi-spec.json33
-rw-r--r--container-core/src/main/java/com/yahoo/processing/Response.java37
-rw-r--r--container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java31
-rw-r--r--container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java11
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java25
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/DataList.java7
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java15
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java24
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/IncomingData.java21
-rw-r--r--container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java20
-rw-r--r--container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java2
-rw-r--r--container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java8
-rw-r--r--container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java3
-rw-r--r--container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java7
-rw-r--r--container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java9
15 files changed, 166 insertions, 87 deletions
diff --git a/container-core/abi-spec.json b/container-core/abi-spec.json
index cdd6da944c5..27fa885ada4 100644
--- a/container-core/abi-spec.json
+++ b/container-core/abi-spec.json
@@ -2839,6 +2839,7 @@
"public void <init>(com.yahoo.processing.Request, com.yahoo.processing.request.ErrorMessage)",
"public void mergeWith(com.yahoo.processing.Response)",
"public com.yahoo.processing.response.DataList data()",
+ "public static java.util.concurrent.CompletableFuture recursiveFuture(com.yahoo.processing.response.DataList)",
"public static com.google.common.util.concurrent.ListenableFuture recursiveComplete(com.yahoo.processing.response.DataList)"
],
"fields": []
@@ -3390,7 +3391,7 @@
"fields": []
},
"com.yahoo.processing.response.AbstractDataList$DrainOnGetFuture": {
- "superClass": "com.google.common.util.concurrent.AbstractFuture",
+ "superClass": "com.yahoo.processing.impl.ProcessingFuture",
"interfaces": [],
"attributes": [
"public",
@@ -3402,8 +3403,8 @@
"public boolean isCancelled()",
"public com.yahoo.processing.response.DataList get()",
"public com.yahoo.processing.response.DataList get(long, java.util.concurrent.TimeUnit)",
- "public bridge synthetic java.lang.Object get()",
- "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)"
+ "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)",
+ "public bridge synthetic java.lang.Object get()"
],
"fields": []
},
@@ -3425,6 +3426,7 @@
"public com.yahoo.processing.Request request()",
"public com.yahoo.processing.response.IncomingData incoming()",
"public com.google.common.util.concurrent.ListenableFuture complete()",
+ "public java.util.concurrent.CompletableFuture future()",
"public boolean isOrdered()",
"public boolean isStreamed()",
"public java.lang.String toString()"
@@ -3483,6 +3485,7 @@
"public abstract com.yahoo.processing.response.Data get(int)",
"public abstract java.util.List asList()",
"public abstract com.yahoo.processing.response.IncomingData incoming()",
+ "public abstract java.util.concurrent.CompletableFuture future()",
"public abstract com.google.common.util.concurrent.ListenableFuture complete()",
"public abstract void addDataListener(java.lang.Runnable)",
"public void close()"
@@ -3503,6 +3506,7 @@
"public final void assignOwner(com.yahoo.processing.response.DataList)",
"public com.yahoo.processing.response.DataList getOwner()",
"public com.google.common.util.concurrent.ListenableFuture completed()",
+ "public java.util.concurrent.CompletableFuture future()",
"public synchronized boolean isComplete()",
"public synchronized void addLast(com.yahoo.processing.response.Data)",
"public synchronized void add(com.yahoo.processing.response.Data)",
@@ -3516,26 +3520,29 @@
"fields": []
},
"com.yahoo.processing.response.FutureResponse": {
- "superClass": "com.google.common.util.concurrent.ForwardingFuture",
- "interfaces": [],
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "java.util.concurrent.Future"
+ ],
"attributes": [
"public"
],
"methods": [
"public void <init>(java.util.concurrent.Callable, com.yahoo.processing.execution.Execution, com.yahoo.processing.Request)",
- "public com.google.common.util.concurrent.ListenableFutureTask delegate()",
+ "public java.util.concurrent.FutureTask delegate()",
+ "public boolean cancel(boolean)",
+ "public boolean isCancelled()",
+ "public boolean isDone()",
"public com.yahoo.processing.Response get()",
"public com.yahoo.processing.Response get(long, java.util.concurrent.TimeUnit)",
"public com.yahoo.processing.Request getRequest()",
"public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)",
- "public bridge synthetic java.lang.Object get()",
- "public bridge synthetic java.util.concurrent.Future delegate()",
- "public bridge synthetic java.lang.Object delegate()"
+ "public bridge synthetic java.lang.Object get()"
],
"fields": []
},
"com.yahoo.processing.response.IncomingData$NullIncomingData$ImmediateFuture": {
- "superClass": "com.google.common.util.concurrent.AbstractFuture",
+ "superClass": "com.yahoo.processing.impl.ProcessingFuture",
"interfaces": [],
"attributes": [
"public"
@@ -3547,8 +3554,8 @@
"public boolean isDone()",
"public com.yahoo.processing.response.DataList get()",
"public com.yahoo.processing.response.DataList get(long, java.util.concurrent.TimeUnit)",
- "public bridge synthetic java.lang.Object get()",
- "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)"
+ "public bridge synthetic java.lang.Object get(long, java.util.concurrent.TimeUnit)",
+ "public bridge synthetic java.lang.Object get()"
],
"fields": []
},
@@ -3564,6 +3571,7 @@
"methods": [
"public void <init>(com.yahoo.processing.response.DataList)",
"public com.google.common.util.concurrent.ListenableFuture completed()",
+ "public java.util.concurrent.CompletableFuture future()",
"public com.yahoo.processing.response.DataList getOwner()",
"public boolean isComplete()",
"public void addLast(com.yahoo.processing.response.Data)",
@@ -3587,6 +3595,7 @@
],
"methods": [
"public abstract com.yahoo.processing.response.DataList getOwner()",
+ "public abstract java.util.concurrent.CompletableFuture future()",
"public abstract com.google.common.util.concurrent.ListenableFuture completed()",
"public abstract boolean isComplete()",
"public abstract void addLast(com.yahoo.processing.response.Data)",
diff --git a/container-core/src/main/java/com/yahoo/processing/Response.java b/container-core/src/main/java/com/yahoo/processing/Response.java
index 0319a36f2f8..9ab95901320 100644
--- a/container-core/src/main/java/com/yahoo/processing/Response.java
+++ b/container-core/src/main/java/com/yahoo/processing/Response.java
@@ -1,12 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing;
-import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.concurrent.SystemTimer;
import com.yahoo.processing.execution.ResponseReceiver;
+import com.yahoo.processing.impl.ProcessingFuture;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.processing.request.ErrorMessage;
import com.yahoo.processing.response.ArrayDataList;
@@ -15,8 +15,8 @@ import com.yahoo.processing.response.DataList;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -57,7 +57,7 @@ public class Response extends ListenableFreezableClass {
if (freezeListener != null) {
if (freezeListener instanceof ResponseReceiver)
((ResponseReceiver)freezeListener).setResponse(this);
- data.addFreezeListener(freezeListener, MoreExecutors.directExecutor());
+ data.addFreezeListener(freezeListener, Runnable::run);
}
}
@@ -96,15 +96,22 @@ public class Response extends ListenableFreezableClass {
* @param rootDataList the list to complete recursively
* @return the future in which all data in and below this list is complete, as the given root dataList for convenience
*/
- public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) {
- List<ListenableFuture<DataList<D>>> futures = new ArrayList<>();
+ public static <D extends Data> CompletableFuture<DataList<D>> recursiveFuture(DataList<D> rootDataList) {
+ List<CompletableFuture<DataList<D>>> futures = new ArrayList<>();
collectCompletionFutures(rootDataList, futures);
return new CompleteAllOnGetFuture<D>(futures);
}
+ /** @deprecated Use {@link #recursiveFuture(DataList)} instead */
+ @Deprecated(forRemoval = true, since = "7")
+ @SuppressWarnings("removal")
+ public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) {
+ return CompletableFutures.toGuavaListenableFuture(recursiveFuture(rootDataList));
+ }
+
@SuppressWarnings("unchecked")
- private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<ListenableFuture<DataList<D>>> futures) {
- futures.add(dataList.complete());
+ private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<CompletableFuture<DataList<D>>> futures) {
+ futures.add(dataList.future());
for (D data : dataList.asList()) {
if (data instanceof DataList)
collectCompletionFutures((DataList<D>) data, futures);
@@ -115,24 +122,24 @@ public class Response extends ListenableFreezableClass {
* A future which on get calls get on all its given futures and sets the value returned from the
* first given future as its result.
*/
- private static class CompleteAllOnGetFuture<D extends Data> extends AbstractFuture<DataList<D>> {
+ private static class CompleteAllOnGetFuture<D extends Data> extends ProcessingFuture<DataList<D>> {
- private final List<ListenableFuture<DataList<D>>> futures;
+ private final List<CompletableFuture<DataList<D>>> futures;
- public CompleteAllOnGetFuture(List<ListenableFuture<DataList<D>>> futures) {
+ public CompleteAllOnGetFuture(List<CompletableFuture<DataList<D>>> futures) {
this.futures = new ArrayList<>(futures);
}
@Override
public DataList<D> get() throws InterruptedException, ExecutionException {
DataList<D> result = null;
- for (ListenableFuture<DataList<D>> future : futures) {
+ for (CompletableFuture<DataList<D>> future : futures) {
if (result == null)
result = future.get();
else
future.get();
}
- set(result);
+ complete(result);
return result;
}
@@ -141,7 +148,7 @@ public class Response extends ListenableFreezableClass {
DataList<D> result = null;
long timeLeft = unit.toMillis(timeout);
long currentCallStart = SystemTimer.INSTANCE.milliTime();
- for (ListenableFuture<DataList<D>> future : futures) {
+ for (CompletableFuture<DataList<D>> future : futures) {
if (result == null)
result = future.get(timeLeft, TimeUnit.MILLISECONDS);
else
@@ -151,7 +158,7 @@ public class Response extends ListenableFreezableClass {
if (timeLeft <= 0) break;
currentCallStart = currentCallEnd;
}
- set(result);
+ complete(result);
return result;
}
diff --git a/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java b/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java
new file mode 100644
index 00000000000..ab597fffaff
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java
@@ -0,0 +1,31 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.impl;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link CompletableFuture} where {@link #get()}/{@link #get(long, TimeUnit)} may have side-effects (e.g trigger the underlying computation).
+ *
+ * @author bjorncs
+ */
+// TODO Vespa 8 remove ListenableFuture implementation
+public abstract class ProcessingFuture<V> extends CompletableFuture<V> implements ListenableFuture<V> {
+
+ @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; }
+ @Override public boolean isCancelled() { return false; }
+
+ @Override public abstract V get() throws InterruptedException, ExecutionException;
+ @Override public abstract V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ whenCompleteAsync((__, ___) -> listener.run(), executor);
+ }
+
+}
diff --git a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
index 21375ee3d76..92df00711f3 100644
--- a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
+++ b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
@@ -2,7 +2,6 @@
package com.yahoo.processing.rendering;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.jdisc.handler.CompletionHandler;
@@ -257,7 +256,7 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
* inadvertently work ends up in async data producing threads in some cases.
*/
Executor getExecutor() {
- return beforeHandoverMode ? MoreExecutors.directExecutor() : renderingExecutor;
+ return beforeHandoverMode ? Runnable::run : renderingExecutor;
}
/** For inspection only; use getExecutor() for execution */
Executor getRenderingExecutor() { return renderingExecutor; }
@@ -360,10 +359,10 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
return; // Called on completion of a list which is not frozen yet - hold off until frozen
if ( ! beforeHandoverMode)
- list.complete().get(); // trigger completion if not done already to invoke any listeners on that event
+ list.future().get(); // trigger completion if not done already to invoke any listeners on that event
boolean startedRendering = renderData();
if ( ! startedRendering || uncompletedChildren > 0) return; // children must render to completion first
- if (list.complete().isDone()) // might not be when in before handover mode
+ if (list.future().isDone()) // might not be when in before handover mode
endListLevel();
else
stream.flush();
@@ -445,8 +444,8 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
flushIfLikelyToSuspend(subList);
subList.addFreezeListener(listListener, getExecutor());
- subList.complete().addListener(listListener, getExecutor());
- subList.incoming().completed().addListener(listListener, getExecutor());
+ subList.future().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
+ subList.incoming().future().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
}
private boolean isOrdered(DataList dataList) {
diff --git a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
index 4633ac5ec1c..ef7aea4716f 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
@@ -1,15 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.processing.Request;
+import com.yahoo.processing.impl.ProcessingFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -34,7 +32,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
private final IncomingData<DATATYPE> incomingData;
- private final ListenableFuture<DataList<DATATYPE>> completedFuture;
+ private final CompletableFuture<DataList<DATATYPE>> completedFuture;
/**
* Creates a simple data list which does not allow late incoming data
@@ -94,10 +92,15 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
return incomingData;
}
+ @Override
+ @SuppressWarnings("removal")
+ @Deprecated(forRemoval = true, since = "7")
public ListenableFuture<DataList<DATATYPE>> complete() {
- return completedFuture;
+ return CompletableFutures.toGuavaListenableFuture(completedFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completedFuture; }
+
@Override
public boolean isOrdered() { return ordered; }
@@ -108,7 +111,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]");
}
- public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+ public static final class DrainOnGetFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> {
private final DataList<DATATYPE> owner;
@@ -137,7 +140,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
@Override
public DataList<DATATYPE> get() throws InterruptedException, ExecutionException {
- return drain(owner.incoming().completed().get());
+ return drain(owner.incoming().future().get());
}
/**
@@ -146,13 +149,13 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
@Override
public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
- return drain(owner.incoming().completed().get(timeout, timeUnit));
+ return drain(owner.incoming().future().get(timeout, timeUnit));
}
private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) {
for (DATATYPE item : dataList.incoming().drain())
dataList.add(item);
- set(dataList); // Signal completion to listeners
+ complete(dataList); // Signal completion to listeners
return dataList;
}
diff --git a/container-core/src/main/java/com/yahoo/processing/response/DataList.java b/container-core/src/main/java/com/yahoo/processing/response/DataList.java
index d566e201375..d30fe4f1a0e 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/DataList.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/DataList.java
@@ -1,11 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CompletableFuture;
/**
* A list of data items created due to a processing request.
@@ -73,6 +72,10 @@ public interface DataList<DATATYPE extends Data> extends Data {
* Making this call on a list which does not support future data always returns immediately and
* causes no memory synchronization cost.
*/
+ CompletableFuture<DataList<DATATYPE>> future();
+
+ /** @deprecated Use {@link #future()} instead */
+ @Deprecated(forRemoval = true, since = "7")
ListenableFuture<DataList<DATATYPE>> complete();
/**
diff --git a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
index 619e554f45c..0a2bcdc90b0 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
@@ -2,12 +2,13 @@
package com.yahoo.processing.response;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.collections.Tuple2;
+import com.yahoo.concurrent.CompletableFutures;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
@@ -19,7 +20,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
private DataList<DATATYPE> owner = null;
- private final SettableFuture<DataList<DATATYPE>> completionFuture;
+ private final CompletableFuture<DataList<DATATYPE>> completionFuture;
private final List<DATATYPE> dataList = new ArrayList<>();
@@ -35,7 +36,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
public DefaultIncomingData(DataList<DATATYPE> owner) {
assignOwner(owner);
- completionFuture = SettableFuture.create();
+ completionFuture = new CompletableFuture<>();
}
/** Assigns the owner of this. Throws an exception if the owner is already set. */
@@ -50,10 +51,14 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
}
@Override
+ @Deprecated(forRemoval = true, since = "7")
+ @SuppressWarnings("removal")
public ListenableFuture<DataList<DATATYPE>> completed() {
- return completionFuture;
+ return CompletableFutures.toGuavaListenableFuture(completionFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; }
+
/** Returns whether the data in this is complete */
@Override
public synchronized boolean isComplete() {
@@ -92,7 +97,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
@Override
public synchronized void markComplete() {
complete = true;
- completionFuture.set(owner);
+ completionFuture.complete(owner);
}
/**
diff --git a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
index d589b7dd195..25c230e383f 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
@@ -1,8 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.ForwardingFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
import com.yahoo.processing.Request;
import com.yahoo.processing.Response;
import com.yahoo.processing.execution.Execution;
@@ -10,6 +8,8 @@ import com.yahoo.processing.request.ErrorMessage;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
@@ -20,9 +20,10 @@ import java.util.logging.Logger;
*
* @author bratseth
*/
-public class FutureResponse extends ForwardingFuture<Response> {
+public class FutureResponse implements Future<Response> {
private final Request request;
+ private final FutureTask<Response> task;
/**
* Only used for generating messages
@@ -31,24 +32,23 @@ public class FutureResponse extends ForwardingFuture<Response> {
private final static Logger log = Logger.getLogger(FutureResponse.class.getName());
- private final ListenableFutureTask<Response> futureTask;
-
public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) {
- this.futureTask = ListenableFutureTask.create(callable);
+ this.task = new FutureTask<>(callable);
this.request = request;
this.execution = execution;
}
- @Override
- public ListenableFutureTask<Response> delegate() {
- return futureTask;
- }
+ public FutureTask<Response> delegate() { return task; }
+
+ @Override public boolean cancel(boolean mayInterruptIfRunning) { return task.cancel(mayInterruptIfRunning); }
+ @Override public boolean isCancelled() { return task.isCancelled(); }
+ @Override public boolean isDone() { return task.isDone(); }
public
@Override
Response get() {
try {
- return super.get();
+ return task.get();
} catch (InterruptedException e) {
return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
} catch (ExecutionException e) {
@@ -61,7 +61,7 @@ public class FutureResponse extends ForwardingFuture<Response> {
@Override
Response get(long timeout, TimeUnit timeunit) {
try {
- return super.get(timeout, timeunit);
+ return task.get(timeout, timeunit);
} catch (InterruptedException e) {
return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
} catch (ExecutionException e) {
diff --git a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
index 371c1bca45f..c8a013fbda9 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
@@ -1,11 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import com.yahoo.concurrent.CompletableFutures;
+import com.yahoo.processing.impl.ProcessingFuture;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -35,6 +37,10 @@ public interface IncomingData<DATATYPE extends Data> {
* <p>
* This return the list owning this for convenience.
*/
+ CompletableFuture<DataList<DATATYPE>> future();
+
+ /** @deprecated Use {@link #future()} instead */
+ @Deprecated(forRemoval = true, since = "7")
ListenableFuture<DataList<DATATYPE>> completed();
/**
@@ -108,10 +114,15 @@ public interface IncomingData<DATATYPE extends Data> {
completionFuture = new ImmediateFuture<>(owner);
}
+ @Override
+ @SuppressWarnings("removal")
+ @Deprecated(forRemoval = true, since = "7")
public ListenableFuture<DataList<DATATYPE>> completed() {
- return completionFuture;
+ return CompletableFutures.toGuavaListenableFuture(completionFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; }
+
@Override
public DataList<DATATYPE> getOwner() {
return owner;
@@ -178,13 +189,13 @@ public interface IncomingData<DATATYPE extends Data> {
* This is semantically the same as Futures.immediateFuture but contrary to it,
* this never causes any memory synchronization when accessed.
*/
- public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+ public static class ImmediateFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> {
- private DataList<DATATYPE> owner;
+ private final DataList<DATATYPE> owner;
public ImmediateFuture(DataList<DATATYPE> owner) {
this.owner = owner; // keep here to avoid memory synchronization for access
- set(owner); // Signal completion (for future listeners)
+ complete(owner); // Signal completion (for future listeners)
}
@Override
diff --git a/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
index aebbc3f538d..5a672b77762 100644
--- a/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
+++ b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
@@ -1,8 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.test;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.component.chain.Chain;
import com.yahoo.processing.Processor;
import com.yahoo.processing.Request;
@@ -15,6 +13,7 @@ import com.yahoo.processing.request.ErrorMessage;
import com.yahoo.processing.response.*;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
@@ -288,7 +287,7 @@ public class ProcessorLibrary {
private final boolean ordered, streamed;
/** The incoming data this has created */
- public final SettableFuture<IncomingData> incomingData = SettableFuture.create();
+ public final CompletableFuture<IncomingData> incomingData = new CompletableFuture<>();
/** Create an instance which returns ordered, streamable data */
public ListenableFutureDataSource() { this(true, true); }
@@ -307,7 +306,7 @@ public class ProcessorLibrary {
dataList = ArrayDataList.createAsyncNonstreamed(request);
else
dataList = ArrayDataList.createAsync(request);
- incomingData.set(dataList.incoming());
+ incomingData.complete(dataList.incoming());
return new Response(dataList);
}
@@ -317,12 +316,12 @@ public class ProcessorLibrary {
public static class RequestCounter extends Processor {
/** The incoming data this has created */
- public final SettableFuture<IncomingData> incomingData = SettableFuture.create();
+ public final CompletableFuture<IncomingData> incomingData = new CompletableFuture<>();
@Override
public Response process(Request request, Execution execution) {
ArrayDataList dataList = ArrayDataList.createAsync(request);
- incomingData.set(dataList.incoming());
+ incomingData.complete(dataList.incoming());
return new Response(dataList);
}
@@ -354,7 +353,7 @@ public class ProcessorLibrary {
// wait for other executions and merge the responses
for (Response additionalResponse : AsyncExecution.waitForAll(futures, 1000)) {
- additionalResponse.data().complete().get(); // block until we have all the data elements
+ additionalResponse.data().future().get(); // block until we have all the data elements
for (Object item : additionalResponse.data().asList())
response.data().add((Data) item);
response.mergeWith(additionalResponse);
@@ -382,9 +381,10 @@ public class ProcessorLibrary {
public Response process(Request request, Execution execution) {
Response response = execution.process(request);
// TODO: Consider for to best provide helpers for this
- response.data().complete().addListener(new RunnableExecution(request,
- new ExecutionWithResponse(asyncChain, response, execution)),
- MoreExecutors.directExecutor());
+ response.data().future().whenComplete(
+ (__, ___) ->
+ new RunnableExecution(request, new ExecutionWithResponse(asyncChain, response, execution))
+ .run());
return response;
}
diff --git a/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java b/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java
index 0f16aed3d0b..efcf608b6f0 100644
--- a/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java
+++ b/container-core/src/test/java/com/yahoo/processing/ResponseTestCase.java
@@ -22,7 +22,7 @@ public class ResponseTestCase {
* Check the recursive toString printing along the way.
* List variable names ends by numbers specifying the index of the list at each level.
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "removal"})
@Test
public void testRecursiveCompletionAndToString() throws InterruptedException, ExecutionException {
// create lists
diff --git a/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java b/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java
index 40e7384c745..2fb32271419 100644
--- a/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java
+++ b/container-core/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertEquals;
public class FutureDataTestCase {
/** Run a chain which ends in a processor which returns a response containing future data. */
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "removal"})
@Test
public void testFutureDataPassThrough() throws InterruptedException, ExecutionException, TimeoutException {
// Set up
@@ -52,7 +52,7 @@ public class FutureDataTestCase {
}
/** Federate to one source which returns data immediately and one who return future data */
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "removal"})
@Test
public void testFederateSyncAndAsyncData() throws InterruptedException, ExecutionException, TimeoutException {
// Set up
@@ -88,7 +88,7 @@ public class FutureDataTestCase {
}
/** Register a chain which will be called when some async data is available */
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "removal"})
@Test
public void testAsyncDataProcessing() throws InterruptedException, ExecutionException, TimeoutException {
// Set up
@@ -120,7 +120,7 @@ public class FutureDataTestCase {
* When the first of the futures are done one additional chain is to be run.
* When both are done another chain is to be run.
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "removal"})
@Test
public void testAsyncDataProcessingOfFederatedResult() throws InterruptedException, ExecutionException, TimeoutException {
// Set up
diff --git a/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java b/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java
index 1ebf01c5f33..bd1307ff77c 100644
--- a/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java
+++ b/container-core/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java
@@ -13,7 +13,6 @@ import com.yahoo.processing.test.ProcessorLibrary;
import org.junit.Test;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -27,7 +26,7 @@ import static org.junit.Assert.assertEquals;
public class StreamingTestCase {
/** Tests adding a chain which is called every time new data is added to a data list */
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "removal"})
@Test
public void testStreamingData() throws InterruptedException, ExecutionException, TimeoutException {
// Set up
diff --git a/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java b/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java
index 50864c8b034..52f0c457ce2 100644
--- a/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java
+++ b/container-core/src/test/java/com/yahoo/processing/rendering/AsynchronousSectionedRendererTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -407,6 +408,7 @@ public class AsynchronousSectionedRendererTest {
}
@Override
+ @SuppressWarnings("removal")
public ListenableFuture<DataList<StringData>> complete() {
return new ListenableFuture<DataList<StringData>>() {
@Override
@@ -442,6 +444,11 @@ public class AsynchronousSectionedRendererTest {
}
@Override
+ public CompletableFuture<DataList<StringData>> future() {
+ return CompletableFuture.completedFuture(this);
+ }
+
+ @Override
public String getString() {
return list.toString();
}
diff --git a/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java b/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java
index 67a6634b659..21731f7d714 100644
--- a/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java
+++ b/container-core/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java
@@ -3,8 +3,12 @@ package com.yahoo.processing.test.documentation;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.component.chain.Chain;
-import com.yahoo.processing.*;
-import com.yahoo.processing.execution.*;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.execution.ExecutionWithResponse;
+import com.yahoo.processing.execution.RunnableExecution;
/**
* A processor which registers a listener on the future completion of
@@ -18,6 +22,7 @@ public class AsyncDataProcessingInitiator extends Processor {
this.asyncChain=asyncChain;
}
+ @SuppressWarnings({"removal"})
@Override
public Response process(Request request, Execution execution) {
Response response=execution.process(request);