summaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/processing/response
diff options
context:
space:
mode:
Diffstat (limited to 'container-core/src/main/java/com/yahoo/processing/response')
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java25
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/DataList.java7
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java15
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java24
-rw-r--r--container-core/src/main/java/com/yahoo/processing/response/IncomingData.java21
5 files changed, 57 insertions, 35 deletions
diff --git a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
index 4633ac5ec1c..ef7aea4716f 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java
@@ -1,15 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.concurrent.CompletableFutures;
import com.yahoo.processing.Request;
+import com.yahoo.processing.impl.ProcessingFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -34,7 +32,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
private final IncomingData<DATATYPE> incomingData;
- private final ListenableFuture<DataList<DATATYPE>> completedFuture;
+ private final CompletableFuture<DataList<DATATYPE>> completedFuture;
/**
* Creates a simple data list which does not allow late incoming data
@@ -94,10 +92,15 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
return incomingData;
}
+ @Override
+ @SuppressWarnings("removal")
+ @Deprecated(forRemoval = true, since = "7")
public ListenableFuture<DataList<DATATYPE>> complete() {
- return completedFuture;
+ return CompletableFutures.toGuavaListenableFuture(completedFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completedFuture; }
+
@Override
public boolean isOrdered() { return ordered; }
@@ -108,7 +111,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]");
}
- public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+ public static final class DrainOnGetFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> {
private final DataList<DATATYPE> owner;
@@ -137,7 +140,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
@Override
public DataList<DATATYPE> get() throws InterruptedException, ExecutionException {
- return drain(owner.incoming().completed().get());
+ return drain(owner.incoming().future().get());
}
/**
@@ -146,13 +149,13 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable
*/
@Override
public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
- return drain(owner.incoming().completed().get(timeout, timeUnit));
+ return drain(owner.incoming().future().get(timeout, timeUnit));
}
private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) {
for (DATATYPE item : dataList.incoming().drain())
dataList.add(item);
- set(dataList); // Signal completion to listeners
+ complete(dataList); // Signal completion to listeners
return dataList;
}
diff --git a/container-core/src/main/java/com/yahoo/processing/response/DataList.java b/container-core/src/main/java/com/yahoo/processing/response/DataList.java
index d566e201375..d30fe4f1a0e 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/DataList.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/DataList.java
@@ -1,11 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CompletableFuture;
/**
* A list of data items created due to a processing request.
@@ -73,6 +72,10 @@ public interface DataList<DATATYPE extends Data> extends Data {
* Making this call on a list which does not support future data always returns immediately and
* causes no memory synchronization cost.
*/
+ CompletableFuture<DataList<DATATYPE>> future();
+
+ /** @deprecated Use {@link #future()} instead */
+ @Deprecated(forRemoval = true, since = "7")
ListenableFuture<DataList<DATATYPE>> complete();
/**
diff --git a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
index 619e554f45c..0a2bcdc90b0 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
@@ -2,12 +2,13 @@
package com.yahoo.processing.response;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.yahoo.collections.Tuple2;
+import com.yahoo.concurrent.CompletableFutures;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
@@ -19,7 +20,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
private DataList<DATATYPE> owner = null;
- private final SettableFuture<DataList<DATATYPE>> completionFuture;
+ private final CompletableFuture<DataList<DATATYPE>> completionFuture;
private final List<DATATYPE> dataList = new ArrayList<>();
@@ -35,7 +36,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
public DefaultIncomingData(DataList<DATATYPE> owner) {
assignOwner(owner);
- completionFuture = SettableFuture.create();
+ completionFuture = new CompletableFuture<>();
}
/** Assigns the owner of this. Throws an exception if the owner is already set. */
@@ -50,10 +51,14 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
}
@Override
+ @Deprecated(forRemoval = true, since = "7")
+ @SuppressWarnings("removal")
public ListenableFuture<DataList<DATATYPE>> completed() {
- return completionFuture;
+ return CompletableFutures.toGuavaListenableFuture(completionFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; }
+
/** Returns whether the data in this is complete */
@Override
public synchronized boolean isComplete() {
@@ -92,7 +97,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<
@Override
public synchronized void markComplete() {
complete = true;
- completionFuture.set(owner);
+ completionFuture.complete(owner);
}
/**
diff --git a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
index d589b7dd195..25c230e383f 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java
@@ -1,8 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.ForwardingFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
import com.yahoo.processing.Request;
import com.yahoo.processing.Response;
import com.yahoo.processing.execution.Execution;
@@ -10,6 +8,8 @@ import com.yahoo.processing.request.ErrorMessage;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
@@ -20,9 +20,10 @@ import java.util.logging.Logger;
*
* @author bratseth
*/
-public class FutureResponse extends ForwardingFuture<Response> {
+public class FutureResponse implements Future<Response> {
private final Request request;
+ private final FutureTask<Response> task;
/**
* Only used for generating messages
@@ -31,24 +32,23 @@ public class FutureResponse extends ForwardingFuture<Response> {
private final static Logger log = Logger.getLogger(FutureResponse.class.getName());
- private final ListenableFutureTask<Response> futureTask;
-
public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) {
- this.futureTask = ListenableFutureTask.create(callable);
+ this.task = new FutureTask<>(callable);
this.request = request;
this.execution = execution;
}
- @Override
- public ListenableFutureTask<Response> delegate() {
- return futureTask;
- }
+ public FutureTask<Response> delegate() { return task; }
+
+ @Override public boolean cancel(boolean mayInterruptIfRunning) { return task.cancel(mayInterruptIfRunning); }
+ @Override public boolean isCancelled() { return task.isCancelled(); }
+ @Override public boolean isDone() { return task.isDone(); }
public
@Override
Response get() {
try {
- return super.get();
+ return task.get();
} catch (InterruptedException e) {
return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
} catch (ExecutionException e) {
@@ -61,7 +61,7 @@ public class FutureResponse extends ForwardingFuture<Response> {
@Override
Response get(long timeout, TimeUnit timeunit) {
try {
- return super.get(timeout, timeunit);
+ return task.get(timeout, timeunit);
} catch (InterruptedException e) {
return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
} catch (ExecutionException e) {
diff --git a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
index 371c1bca45f..c8a013fbda9 100644
--- a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
+++ b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
@@ -1,11 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;
-import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import com.yahoo.concurrent.CompletableFutures;
+import com.yahoo.processing.impl.ProcessingFuture;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -35,6 +37,10 @@ public interface IncomingData<DATATYPE extends Data> {
* <p>
* This return the list owning this for convenience.
*/
+ CompletableFuture<DataList<DATATYPE>> future();
+
+ /** @deprecated Use {@link #future()} instead */
+ @Deprecated(forRemoval = true, since = "7")
ListenableFuture<DataList<DATATYPE>> completed();
/**
@@ -108,10 +114,15 @@ public interface IncomingData<DATATYPE extends Data> {
completionFuture = new ImmediateFuture<>(owner);
}
+ @Override
+ @SuppressWarnings("removal")
+ @Deprecated(forRemoval = true, since = "7")
public ListenableFuture<DataList<DATATYPE>> completed() {
- return completionFuture;
+ return CompletableFutures.toGuavaListenableFuture(completionFuture);
}
+ @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; }
+
@Override
public DataList<DATATYPE> getOwner() {
return owner;
@@ -178,13 +189,13 @@ public interface IncomingData<DATATYPE extends Data> {
* This is semantically the same as Futures.immediateFuture but contrary to it,
* this never causes any memory synchronization when accessed.
*/
- public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+ public static class ImmediateFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> {
- private DataList<DATATYPE> owner;
+ private final DataList<DATATYPE> owner;
public ImmediateFuture(DataList<DATATYPE> owner) {
this.owner = owner; // keep here to avoid memory synchronization for access
- set(owner); // Signal completion (for future listeners)
+ complete(owner); // Signal completion (for future listeners)
}
@Override