diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-12-08 15:00:07 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-12-08 15:00:07 +0100 |
commit | 4f11c490da2bee48a40e80cfbbfda9914457043c (patch) | |
tree | f2f784afe2c80bc04b2b32b5b588ac65f5643226 /container-core/src/main/java/com/yahoo/processing/response | |
parent | 92fb7c5a6b124a9f384ed54cb5be166c14a5e910 (diff) |
Deprecate methods using Guava ListenableFuture
- com.yahoo.processing.Response.recursiveComplete()
- com.yahoo.processing.response.DataList.complete()
- com.yahoo.processing.response.IncomingData.completed()
Also fixes bug in FutureResponse where `get()` does not return value produced by async task.
Diffstat (limited to 'container-core/src/main/java/com/yahoo/processing/response')
5 files changed, 57 insertions, 35 deletions
diff --git a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java index 4633ac5ec1c..ef7aea4716f 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java +++ b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java @@ -1,15 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.concurrent.CompletableFutures; import com.yahoo.processing.Request; +import com.yahoo.processing.impl.ProcessingFuture; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -34,7 +32,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable */ private final IncomingData<DATATYPE> incomingData; - private final ListenableFuture<DataList<DATATYPE>> completedFuture; + private final CompletableFuture<DataList<DATATYPE>> completedFuture; /** * Creates a simple data list which does not allow late incoming data @@ -94,10 +92,15 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable return incomingData; } + @Override + @SuppressWarnings("removal") + @Deprecated(forRemoval = true, since = "7") public ListenableFuture<DataList<DATATYPE>> complete() { - return completedFuture; + return CompletableFutures.toGuavaListenableFuture(completedFuture); } + @Override public CompletableFuture<DataList<DATATYPE>> future() { return completedFuture; } + @Override public boolean isOrdered() { return ordered; } @@ -108,7 +111,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]"); } - public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + public static final class DrainOnGetFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> { private final DataList<DATATYPE> owner; @@ -137,7 +140,7 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable */ @Override public DataList<DATATYPE> get() throws InterruptedException, ExecutionException { - return drain(owner.incoming().completed().get()); + return drain(owner.incoming().future().get()); } /** @@ -146,13 +149,13 @@ public abstract class AbstractDataList<DATATYPE extends Data> extends Listenable */ @Override public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - return drain(owner.incoming().completed().get(timeout, timeUnit)); + return drain(owner.incoming().future().get(timeout, timeUnit)); } private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) { for (DATATYPE item : dataList.incoming().drain()) dataList.add(item); - set(dataList); // Signal completion to listeners + complete(dataList); // Signal completion to listeners return dataList; } diff --git a/container-core/src/main/java/com/yahoo/processing/response/DataList.java b/container-core/src/main/java/com/yahoo/processing/response/DataList.java index d566e201375..d30fe4f1a0e 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/DataList.java +++ b/container-core/src/main/java/com/yahoo/processing/response/DataList.java @@ -1,11 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.ExecutionList; import com.google.common.util.concurrent.ListenableFuture; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.CompletableFuture; /** * A list of data items created due to a processing request. @@ -73,6 +72,10 @@ public interface DataList<DATATYPE extends Data> extends Data { * Making this call on a list which does not support future data always returns immediately and * causes no memory synchronization cost. */ + CompletableFuture<DataList<DATATYPE>> future(); + + /** @deprecated Use {@link #future()} instead */ + @Deprecated(forRemoval = true, since = "7") ListenableFuture<DataList<DATATYPE>> complete(); /** diff --git a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java index 619e554f45c..0a2bcdc90b0 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java +++ b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java @@ -2,12 +2,13 @@ package com.yahoo.processing.response; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.yahoo.collections.Tuple2; +import com.yahoo.concurrent.CompletableFutures; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** @@ -19,7 +20,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< private DataList<DATATYPE> owner = null; - private final SettableFuture<DataList<DATATYPE>> completionFuture; + private final CompletableFuture<DataList<DATATYPE>> completionFuture; private final List<DATATYPE> dataList = new ArrayList<>(); @@ -35,7 +36,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< public DefaultIncomingData(DataList<DATATYPE> owner) { assignOwner(owner); - completionFuture = SettableFuture.create(); + completionFuture = new CompletableFuture<>(); } /** Assigns the owner of this. Throws an exception if the owner is already set. */ @@ -50,10 +51,14 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< } @Override + @Deprecated(forRemoval = true, since = "7") + @SuppressWarnings("removal") public ListenableFuture<DataList<DATATYPE>> completed() { - return completionFuture; + return CompletableFutures.toGuavaListenableFuture(completionFuture); } + @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; } + /** Returns whether the data in this is complete */ @Override public synchronized boolean isComplete() { @@ -92,7 +97,7 @@ public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData< @Override public synchronized void markComplete() { complete = true; - completionFuture.set(owner); + completionFuture.complete(owner); } /** diff --git a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java index d589b7dd195..25c230e383f 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java +++ b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.ForwardingFuture; -import com.google.common.util.concurrent.ListenableFutureTask; import com.yahoo.processing.Request; import com.yahoo.processing.Response; import com.yahoo.processing.execution.Execution; @@ -10,6 +8,8 @@ import com.yahoo.processing.request.ErrorMessage; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; @@ -20,9 +20,10 @@ import java.util.logging.Logger; * * @author bratseth */ -public class FutureResponse extends ForwardingFuture<Response> { +public class FutureResponse implements Future<Response> { private final Request request; + private final FutureTask<Response> task; /** * Only used for generating messages @@ -31,24 +32,23 @@ public class FutureResponse extends ForwardingFuture<Response> { private final static Logger log = Logger.getLogger(FutureResponse.class.getName()); - private final ListenableFutureTask<Response> futureTask; - public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) { - this.futureTask = ListenableFutureTask.create(callable); + this.task = new FutureTask<>(callable); this.request = request; this.execution = execution; } - @Override - public ListenableFutureTask<Response> delegate() { - return futureTask; - } + public FutureTask<Response> delegate() { return task; } + + @Override public boolean cancel(boolean mayInterruptIfRunning) { return task.cancel(mayInterruptIfRunning); } + @Override public boolean isCancelled() { return task.isCancelled(); } + @Override public boolean isDone() { return task.isDone(); } public @Override Response get() { try { - return super.get(); + return task.get(); } catch (InterruptedException e) { return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); } catch (ExecutionException e) { @@ -61,7 +61,7 @@ public class FutureResponse extends ForwardingFuture<Response> { @Override Response get(long timeout, TimeUnit timeunit) { try { - return super.get(timeout, timeunit); + return task.get(timeout, timeunit); } catch (InterruptedException e) { return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); } catch (ExecutionException e) { diff --git a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java index 371c1bca45f..c8a013fbda9 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java +++ b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java @@ -1,11 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; -import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.concurrent.CompletableFutures; +import com.yahoo.processing.impl.ProcessingFuture; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -35,6 +37,10 @@ public interface IncomingData<DATATYPE extends Data> { * <p> * This return the list owning this for convenience. */ + CompletableFuture<DataList<DATATYPE>> future(); + + /** @deprecated Use {@link #future()} instead */ + @Deprecated(forRemoval = true, since = "7") ListenableFuture<DataList<DATATYPE>> completed(); /** @@ -108,10 +114,15 @@ public interface IncomingData<DATATYPE extends Data> { completionFuture = new ImmediateFuture<>(owner); } + @Override + @SuppressWarnings("removal") + @Deprecated(forRemoval = true, since = "7") public ListenableFuture<DataList<DATATYPE>> completed() { - return completionFuture; + return CompletableFutures.toGuavaListenableFuture(completionFuture); } + @Override public CompletableFuture<DataList<DATATYPE>> future() { return completionFuture; } + @Override public DataList<DATATYPE> getOwner() { return owner; @@ -178,13 +189,13 @@ public interface IncomingData<DATATYPE extends Data> { * This is semantically the same as Futures.immediateFuture but contrary to it, * this never causes any memory synchronization when accessed. */ - public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + public static class ImmediateFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> { - private DataList<DATATYPE> owner; + private final DataList<DATATYPE> owner; public ImmediateFuture(DataList<DATATYPE> owner) { this.owner = owner; // keep here to avoid memory synchronization for access - set(owner); // Signal completion (for future listeners) + complete(owner); // Signal completion (for future listeners) } @Override |