summaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo
diff options
context:
space:
mode:
Diffstat (limited to 'container-core/src/main/java/com/yahoo')
-rw-r--r--container-core/src/main/java/com/yahoo/processing/Response.java37
-rw-r--r--container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java31
-rw-r--r--container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java11
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java25
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/DataList.java7
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java15
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java24
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/IncomingData.java21
-rw-r--r--container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java20
9 files changed, 125 insertions, 66 deletions
diff --git a/container-core/src/main/java/com/yahoo/processing/Response.java b/container-core/src/main/java/com/yahoo/processing/Response.java
index 0319a36f2f8..9ab95901320 100644
--- a/container-core/src/main/java/com/yahoo/processing/Response.java
+++ b/container-core/src/main/java/com/yahoo/processing/Response.java
@@ -1,12 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing;
-import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.concurrent.SystemTimer;
import com.yahoo.processing.execution.ResponseReceiver;
+import com.yahoo.processing.impl.ProcessingFuture;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.processing.request.ErrorMessage;
import com.yahoo.processing.response.ArrayDataList;
@@ -15,8 +15,8 @@ import com.yahoo.processing.response.DataList;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -57,7 +57,7 @@ public class Response extends ListenableFreezableClass {
if (freezeListener != null) {
if (freezeListener instanceof ResponseReceiver)
((ResponseReceiver)freezeListener).setResponse(this);
- data.addFreezeListener(freezeListener, MoreExecutors.directExecutor());
+ data.addFreezeListener(freezeListener, Runnable::run);
}
}
@@ -96,15 +96,22 @@ public class Response extends ListenableFreezableClass {
* @param rootDataList the list to complete recursively
* @return the future in which all data in and below this list is complete, as the given root dataList for convenience
*/
- public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) {
- List<ListenableFuture<DataList<D>>> futures = new ArrayList<>();
+ public static <D extends Data> CompletableFuture<DataList<D>> recursiveFuture(DataList<D> rootDataList) {
+ List<CompletableFuture<DataList<D>>> futures = new ArrayList<>();
collectCompletionFutures(rootDataList, futures);
return new CompleteAllOnGetFuture<D>(futures);
}
+ /** @deprecated Use {@link #recursiveFuture(DataList)} instead */
+ @Deprecated(forRemoval = true, since = "7")
+ @SuppressWarnings("removal")
+ public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) {
+ return CompletableFutures.toGuavaListenableFuture(recursiveFuture(rootDataList));
+ }
+
@SuppressWarnings("unchecked")
- private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<ListenableFuture<DataList<D>>> futures) {
- futures.add(dataList.complete());
+ private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<CompletableFuture<DataList<D>>> futures) {
+ futures.add(dataList.future());
for (D data : dataList.asList()) {
if (data instanceof DataList)
collectCompletionFutures((DataList<D>) data, futures);
@@ -115,24 +122,24 @@ public class Response extends ListenableFreezableClass {
* A future which on get calls get on all its given futures and sets the value returned from the
* first given future as its result.
*/
- private static class CompleteAllOnGetFuture<D extends Data> extends AbstractFuture<DataList<D>> {
+ private static class CompleteAllOnGetFuture<D extends Data> extends ProcessingFuture<DataList<D>> {
- private final List<ListenableFuture<DataList<D>>> futures;
+ private final List<CompletableFuture<DataList<D>>> futures;
- public CompleteAllOnGetFuture(List<ListenableFuture<DataList<D>>> futures) {
+ public CompleteAllOnGetFuture(List<CompletableFuture<DataList<D>>> futures) {
this.futures = new ArrayList<>(futures);
}
@Override
public DataList<D> get() throws InterruptedException, ExecutionException {
DataList<D> result = null;
- for (ListenableFuture<DataList<D>> future : futures) {
+ for (CompletableFuture<DataList<D>> future : futures) {
if (result == null)
result = future.get();
else
future.get();
}
- set(result);
+ complete(result);
return result;
}
@@ -141,7 +148,7 @@ public class Response extends ListenableFreezableClass {
DataList<D> result = null;
long timeLeft = unit.toMillis(timeout);
long currentCallStart = SystemTimer.INSTANCE.milliTime();
- for (ListenableFuture<DataList<D>> future : futures) {
+ for (CompletableFuture<DataList<D>> future : futures) {
if (result == null)
result = future.get(timeLeft, TimeUnit.MILLISECONDS);
else
@@ -151,7 +158,7 @@ public class Response extends ListenableFreezableClass {
if (timeLeft <= 0) break;
currentCallStart = currentCallEnd;
}
- set(result);
+ complete(result);
return result;
}
diff --git a/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java b/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java
new file mode 100644
index 00000000000..ab597fffaff
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/processing/impl/ProcessingFuture.java
@@ -0,0 +1,31 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.impl;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link CompletableFuture} where {@link #get()}/{@link #get(long, TimeUnit)} may have side-effects (e.g trigger the underlying computation).
+ *
+ * @author bjorncs
+ */
+// TODO Vespa 8 remove ListenableFuture implementation
+public abstract class ProcessingFuture<V> extends CompletableFuture<V> implements ListenableFuture<V> {
+
+ @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; }
+ @Override public boolean isCancelled() { return false; }
+
+ @Override public abstract V get() throws InterruptedException, ExecutionException;
+ @Override public abstract V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ whenCompleteAsync((__, ___) -> listener.run(), executor);
+ }
+
+}
diff --git a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
index 21375ee3d76..92df00711f3 100644
--- a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
+++ b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
@@ -2,7 +2,6 @@
package com.yahoo.processing.rendering;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.jdisc.handler.CompletionHandler;
@@ -257,7 +256,7 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
* inadvertently work ends up in async data producing threads in some cases.
*/
Executor getExecutor() {
- return beforeHandoverMode ? MoreExecutors.directExecutor() : renderingExecutor;
+ return beforeHandoverMode ? Runnable::run : renderingExecutor;
}
/** For inspection only; use getExecutor() for execution */
Executor getRenderingExecutor() { return renderingExecutor; }
@@ -360,10 +359,10 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
return; // Called on completion of a list which is not frozen yet - hold off until frozen
if ( ! beforeHandoverMode)
- list.complete().get(); // trigger completion if not done already to invoke any listeners on that event
+ list.future().get(); // trigger completion if not done already to invoke any listeners on that event
boolean startedRendering = renderData();
if ( ! startedRendering || uncompletedChildren > 0) return; // children must render to completion first
- if (list.complete().isDone()) // might not be when in before handover mode
+ if (list.future().isDone()) // might not be when in before handover mode
endListLevel();
else
stream.flush();
@@ -445,8 +444,8 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
flushIfLikelyToSuspend(subList);
subList.addFreezeListener(listListener, getExecutor());
- subList.complete().addListener(listListener, getExecutor());
- subList.incoming().completed().addListener(listListener, getExecutor());
+ subList.future().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
+ subList.incoming().future().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
}
private boolean isOrdered(DataList dataList) {
diff --git a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
index 4633ac5ec1c..ef7aea4716f 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
@@ -1,15 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.processing.Request;
+import com.yahoo.processing.impl.ProcessingFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -34,7 +32,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
private final IncomingData<DATATYPE> incomingData;
- private final ListenableFuture<DataList<DATATYPE>> completedFuture;
+ private final CompletableFuture<DataList<DATATYPE>> completedFuture;
/**
* Creates a simple data list which does not allow late incoming data
@@ -94,10 +92,15 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
return incomingData;
}
+ @Override
+ @SuppressWarnings("removal")
+ @Deprecated(forRemoval = true, since = "7")
public ListenableFuture<DataList<DATATYPE>> complete() {
- return completedFuture;
+ return CompletableFutures.toGuavaListenableFuture(completedFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completedFuture; }
+
@Override
public boolean isOrdered() { return ordered; }
@@ -108,7 +111,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]");
}
- public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+ public static final class DrainOnGetFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> {
private final DataList<DATATYPE> owner;
@@ -137,7 +140,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
@Override
public DataList<DATATYPE> get() throws InterruptedException, ExecutionException {
- return drain(owner.incoming().completed().get());
+ return drain(owner.incoming().future().get());
}
/**
@@ -146,13 +149,13 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
@Override
public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
- return drain(owner.incoming().completed().get(timeout, timeUnit));
+ return drain(owner.incoming().future().get(timeout, timeUnit));
}
private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) {
for (DATATYPE item : dataList.incoming().drain())
dataList.add(item);
- set(dataList); // Signal completion to listeners
+ complete(dataList); // Signal completion to listeners
return dataList;
}
diff --git a/container-core/src/main/java/com/yahoo/processing/response/DataList.java b/container-core/src/main/java/com/yahoo/processing/response/DataList.java
index d566e201375..d30fe4f1a0e 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/DataList.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/DataList.java
@@ -1,11 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CompletableFuture;
/**
* A list of data items created due to a processing request.
@@ -73,6 +72,10 @@ public interface DataList<DATATYPE extends Data> extends Data {
* Making this call on a list which does not support future data always returns immediately and
* causes no memory synchronization cost.
*/
+ CompletableFuture<DataList<DATATYPE>> future();
+
+ /** @deprecated Use {@link #future()} instead */
+ @Deprecated(forRemoval = true, since = "7")
ListenableFuture<DataList<DATATYPE>> complete();
/**
diff --git a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
index 619e554f45c..0a2bcdc90b0 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
@@ -2,12 +2,13 @@
package com.yahoo.processing.response;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.collections.Tuple2;
+import com.yahoo.concurrent.CompletableFutures;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
@@ -19,7 +20,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
private DataList<DATATYPE> owner = null;
- private final SettableFuture<DataList<DATATYPE>> completionFuture;
+ private final CompletableFuture<DataList<DATATYPE>> completionFuture;
private final List<DATATYPE> dataList = new ArrayList<>();
@@ -35,7 +36,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
public DefaultIncomingData(DataList<DATATYPE> owner) {
assignOwner(owner);
- completionFuture = SettableFuture.create();
+ completionFuture = new CompletableFuture<>();
}
/** Assigns the owner of this. Throws an exception if the owner is already set. */
@@ -50,10 +51,14 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
}
@Override
+ @Deprecated(forRemoval = true, since = "7")
+ @SuppressWarnings("removal")
public ListenableFuture<DataList<DATATYPE>> completed() {
- return completionFuture;
+ return CompletableFutures.toGuavaListenableFuture(completionFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; }
+
/** Returns whether the data in this is complete */
@Override
public synchronized boolean isComplete() {
@@ -92,7 +97,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
@Override
public synchronized void markComplete() {
complete = true;
- completionFuture.set(owner);
+ completionFuture.complete(owner);
}
/**
diff --git a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
index d589b7dd195..25c230e383f 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
@@ -1,8 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.ForwardingFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
import com.yahoo.processing.Request;
import com.yahoo.processing.Response;
import com.yahoo.processing.execution.Execution;
@@ -10,6 +8,8 @@ import com.yahoo.processing.request.ErrorMessage;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
@@ -20,9 +20,10 @@ import java.util.logging.Logger;
*
* @author bratseth
*/
-public class FutureResponse extends ForwardingFuture<Response> {
+public class FutureResponse implements Future<Response> {
private final Request request;
+ private final FutureTask<Response> task;
/**
* Only used for generating messages
@@ -31,24 +32,23 @@ public class FutureResponse extends ForwardingFuture<Response> {
private final static Logger log = Logger.getLogger(FutureResponse.class.getName());
- private final ListenableFutureTask<Response> futureTask;
-
public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) {
- this.futureTask = ListenableFutureTask.create(callable);
+ this.task = new FutureTask<>(callable);
this.request = request;
this.execution = execution;
}
- @Override
- public ListenableFutureTask<Response> delegate() {
- return futureTask;
- }
+ public FutureTask<Response> delegate() { return task; }
+
+ @Override public boolean cancel(boolean mayInterruptIfRunning) { return task.cancel(mayInterruptIfRunning); }
+ @Override public boolean isCancelled() { return task.isCancelled(); }
+ @Override public boolean isDone() { return task.isDone(); }
public
@Override
Response get() {
try {
- return super.get();
+ return task.get();
} catch (InterruptedException e) {
return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
} catch (ExecutionException e) {
@@ -61,7 +61,7 @@ public class FutureResponse extends ForwardingFuture<Response> {
@Override
Response get(long timeout, TimeUnit timeunit) {
try {
- return super.get(timeout, timeunit);
+ return task.get(timeout, timeunit);
} catch (InterruptedException e) {
return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
} catch (ExecutionException e) {
diff --git a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
index 371c1bca45f..c8a013fbda9 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
@@ -1,11 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import com.yahoo.concurrent.CompletableFutures;
+import com.yahoo.processing.impl.ProcessingFuture;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -35,6 +37,10 @@ public interface IncomingData<DATATYPE extends Data> {
* <p>
* This return the list owning this for convenience.
*/
+ CompletableFuture<DataList<DATATYPE>> future();
+
+ /** @deprecated Use {@link #future()} instead */
+ @Deprecated(forRemoval = true, since = "7")
ListenableFuture<DataList<DATATYPE>> completed();
/**
@@ -108,10 +114,15 @@ public interface IncomingData<DATATYPE extends Data> {
completionFuture = new ImmediateFuture<>(owner);
}
+ @Override
+ @SuppressWarnings("removal")
+ @Deprecated(forRemoval = true, since = "7")
public ListenableFuture<DataList<DATATYPE>> completed() {
- return completionFuture;
+ return CompletableFutures.toGuavaListenableFuture(completionFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; }
+
@Override
public DataList<DATATYPE> getOwner() {
return owner;
@@ -178,13 +189,13 @@ public interface IncomingData<DATATYPE extends Data> {
* This is semantically the same as Futures.immediateFuture but contrary to it,
* this never causes any memory synchronization when accessed.
*/
- public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+ public static class ImmediateFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> {
- private DataList<DATATYPE> owner;
+ private final DataList<DATATYPE> owner;
public ImmediateFuture(DataList<DATATYPE> owner) {
this.owner = owner; // keep here to avoid memory synchronization for access
- set(owner); // Signal completion (for future listeners)
+ complete(owner); // Signal completion (for future listeners)
}
@Override
diff --git a/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
index aebbc3f538d..5a672b77762 100644
--- a/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
+++ b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
@@ -1,8 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.test;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.component.chain.Chain;
import com.yahoo.processing.Processor;
import com.yahoo.processing.Request;
@@ -15,6 +13,7 @@ import com.yahoo.processing.request.ErrorMessage;
import com.yahoo.processing.response.*;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
@@ -288,7 +287,7 @@ public class ProcessorLibrary {
private final boolean ordered, streamed;
/** The incoming data this has created */
- public final SettableFuture<IncomingData> incomingData = SettableFuture.create();
+ public final CompletableFuture<IncomingData> incomingData = new CompletableFuture<>();
/** Create an instance which returns ordered, streamable data */
public ListenableFutureDataSource() { this(true, true); }
@@ -307,7 +306,7 @@ public class ProcessorLibrary {
dataList = ArrayDataList.createAsyncNonstreamed(request);
else
dataList = ArrayDataList.createAsync(request);
- incomingData.set(dataList.incoming());
+ incomingData.complete(dataList.incoming());
return new Response(dataList);
}
@@ -317,12 +316,12 @@ public class ProcessorLibrary {
public static class RequestCounter extends Processor {
/** The incoming data this has created */
- public final SettableFuture<IncomingData> incomingData = SettableFuture.create();
+ public final CompletableFuture<IncomingData> incomingData = new CompletableFuture<>();
@Override
public Response process(Request request, Execution execution) {
ArrayDataList dataList = ArrayDataList.createAsync(request);
- incomingData.set(dataList.incoming());
+ incomingData.complete(dataList.incoming());
return new Response(dataList);
}
@@ -354,7 +353,7 @@ public class ProcessorLibrary {
// wait for other executions and merge the responses
for (Response additionalResponse : AsyncExecution.waitForAll(futures, 1000)) {
- additionalResponse.data().complete().get(); // block until we have all the data elements
+ additionalResponse.data().future().get(); // block until we have all the data elements
for (Object item : additionalResponse.data().asList())
response.data().add((Data) item);
response.mergeWith(additionalResponse);
@@ -382,9 +381,10 @@ public class ProcessorLibrary {
public Response process(Request request, Execution execution) {
Response response = execution.process(request);
// TODO: Consider for to best provide helpers for this
- response.data().complete().addListener(new RunnableExecution(request,
- new ExecutionWithResponse(asyncChain, response, execution)),
- MoreExecutors.directExecutor());
+ response.data().future().whenComplete(
+ (__, ___) ->
+ new RunnableExecution(request, new ExecutionWithResponse(asyncChain, response, execution))
+ .run());
return response;
}