diff options
Diffstat (limited to 'jdisc_core/src/main/java/com')
7 files changed, 77 insertions, 76 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentOutputStream.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentOutputStream.java index 54e50df5a25..e001db2ab81 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentOutputStream.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentOutputStream.java @@ -1,12 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; -import com.google.common.util.concurrent.ListenableFuture; - import java.nio.ByteBuffer; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -20,7 +19,7 @@ import java.util.concurrent.TimeoutException; * * @author Simon Thoresen Hult */ -public class FastContentOutputStream extends AbstractContentOutputStream implements ListenableFuture<Boolean> { +public class FastContentOutputStream extends AbstractContentOutputStream implements Future<Boolean> { private final FastContentWriter out; @@ -78,7 +77,6 @@ public class FastContentOutputStream extends AbstractContentOutputStream impleme return out.get(timeout, unit); } - @Override public void addListener(Runnable listener, Executor executor) { out.addListener(listener, executor); } diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentWriter.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentWriter.java index 596ae07f1d5..7c278c67d59 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentWriter.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FastContentWriter.java @@ -1,16 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Objects; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -25,13 +20,12 @@ import java.util.concurrent.atomic.AtomicInteger; * * @author Simon Thoresen Hult */ -public class FastContentWriter implements ListenableFuture<Boolean>, AutoCloseable { +public class FastContentWriter extends CompletableFuture<Boolean> implements AutoCloseable { private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicInteger numPendingCompletions = new AtomicInteger(); private final CompletionHandler completionHandler = new SimpleCompletionHandler(); private final ContentChannel out; - private final SettableFuture<Boolean> future = SettableFuture.create(); /** * <p>Creates a new FastContentWriter that encapsulates a given {@link ContentChannel}.</p> @@ -87,7 +81,7 @@ public class FastContentWriter implements ListenableFuture<Boolean>, AutoCloseab try { out.write(buf, completionHandler); } catch (Throwable t) { - future.setException(t); + completeExceptionally(t); throw t; } } @@ -103,14 +97,13 @@ public class FastContentWriter implements ListenableFuture<Boolean>, AutoCloseab try { out.close(completionHandler); } catch (Throwable t) { - future.setException(t); + completeExceptionally(t); throw t; } } - @Override public void addListener(Runnable listener, Executor executor) { - future.addListener(listener, executor); + whenCompleteAsync((__, ___) -> listener.run(), executor); } @Override @@ -123,34 +116,19 @@ public class FastContentWriter implements ListenableFuture<Boolean>, AutoCloseab return false; } - @Override - public boolean isDone() { - return future.isDone(); - } - - @Override - public Boolean get() throws InterruptedException, ExecutionException { - return future.get(); - } - - @Override - public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return future.get(timeout, unit); - } - private class SimpleCompletionHandler implements CompletionHandler { @Override public void completed() { numPendingCompletions.decrementAndGet(); if (closed.get() && numPendingCompletions.get() == 0) { - future.set(true); + complete(true); } } @Override public void failed(Throwable t) { - future.setException(t); + completeExceptionally(t); } } } diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureCompletion.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureCompletion.java index ab989b89b1f..a188be6145f 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureCompletion.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureCompletion.java @@ -1,7 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; -import com.google.common.util.concurrent.AbstractFuture; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * <p>This class provides an implementation of {@link CompletionHandler} that allows you to wait for either {@link @@ -13,16 +14,16 @@ import com.google.common.util.concurrent.AbstractFuture; * * @author Simon Thoresen Hult */ -public final class FutureCompletion extends AbstractFuture<Boolean> implements CompletionHandler { +public final class FutureCompletion extends CompletableFuture<Boolean> implements CompletionHandler { @Override public void completed() { - set(true); + complete(true); } @Override public void failed(Throwable t) { - setException(t); + completeExceptionally(t); } @Override @@ -34,4 +35,6 @@ public final class FutureCompletion extends AbstractFuture<Boolean> implements C public final boolean isCancelled() { return false; } + + public void addListener(Runnable r, Executor e) { whenCompleteAsync((__, ___) -> r.run(), e); } } diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureConjunction.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureConjunction.java index 9d0204f33c5..ba304d9e2de 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureConjunction.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureConjunction.java @@ -1,39 +1,46 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.concurrent.CompletableFutures; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * <p>This class implements a Future<Boolean> that is conjunction of zero or more other Future<Boolean>s, * i.e. it evaluates to <code>true</code> if, and only if, all its operands evaluate to <code>true</code>. To use this class, - * simply create an instance of it and add operands to it using the {@link #addOperand(ListenableFuture)} method.</p> - * TODO: consider rewriting usage of FutureConjunction to use CompletableFuture instead. + * simply create an instance of it and add operands to it using the {@link #addOperand(CompletableFuture)} method.</p> * * @author Simon Thoresen Hult */ -final class FutureConjunction implements ListenableFuture<Boolean> { +final class FutureConjunction implements Future<Boolean> { - private final List<ListenableFuture<Boolean>> operands = new LinkedList<>(); + private final List<CompletableFuture<Boolean>> operands = new LinkedList<>(); /** - * <p>Adds a ListenableFuture<Boolean> to this conjunction. This can be called at any time, even after having called + * <p>Adds a {@link CompletableFuture} to this conjunction. This can be called at any time, even after having called * {@link #get()} previously.</p> * * @param operand The operand to add to this conjunction. */ - public void addOperand(ListenableFuture<Boolean> operand) { + public void addOperand(CompletableFuture<Boolean> operand) { operands.add(operand); } - @Override public void addListener(Runnable listener, Executor executor) { - Futures.allAsList(operands).addListener(listener, executor); + CompletableFutures.allOf(operands) + .whenCompleteAsync((__, ___) -> listener.run(), executor); + } + + CompletableFuture<Boolean> completableFuture() { + return CompletableFutures.allOf(operands) + .thenApply(ops -> ops.stream().allMatch(bool -> bool)); } @Override diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureResponse.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureResponse.java index b8073865667..2284c563f50 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureResponse.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/FutureResponse.java @@ -1,16 +1,18 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; -import com.google.common.util.concurrent.AbstractFuture; import com.yahoo.jdisc.Response; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + /** * This class provides an implementation of {@link ResponseHandler} that allows you to wait for a {@link Response} to * be returned. * * @author Simon Thoresen Hult */ -public final class FutureResponse extends AbstractFuture<Response> implements ResponseHandler { +public final class FutureResponse extends CompletableFuture<Response> implements ResponseHandler { private final ResponseHandler handler; @@ -38,6 +40,8 @@ public final class FutureResponse extends AbstractFuture<Response> implements Re }); } + public void addListener(Runnable r, Executor e) { whenCompleteAsync((__, ___) -> r.run(), e); } + /** * <p>Constructs a new FutureResponse that calls the given {@link ResponseHandler} when {@link * #handleResponse(Response)} is invoked.</p> @@ -50,7 +54,7 @@ public final class FutureResponse extends AbstractFuture<Response> implements Re @Override public ContentChannel handleResponse(Response response) { - set(response); + complete(response); return handler.handleResponse(response); } diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/RequestDispatch.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/RequestDispatch.java index c85aa6375af..c1457290904 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/RequestDispatch.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/RequestDispatch.java @@ -1,19 +1,20 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.jdisc.References; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.Response; import com.yahoo.jdisc.SharedResource; -import com.yahoo.jdisc.References; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.concurrent.*; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * <p>This class provides a convenient way of safely dispatching a {@link Request}. Using this class you do not have to @@ -46,7 +47,7 @@ import java.util.List; * * @author Simon Thoresen Hult */ -public abstract class RequestDispatch implements ListenableFuture<Response>, ResponseHandler { +public abstract class RequestDispatch implements Future<Response>, ResponseHandler { private final FutureConjunction completions = new FutureConjunction(); private final FutureResponse futureResponse = new FutureResponse(this); @@ -106,22 +107,26 @@ public abstract class RequestDispatch implements ListenableFuture<Response>, Res * * @return A Future that can be waited for. */ - public final ListenableFuture<Response> dispatch() { + public final CompletableFuture<Response> dispatch() { try (FastContentWriter writer = new FastContentWriter(connect())) { for (ByteBuffer buf : requestContent()) { writer.write(buf); } completions.addOperand(writer); } - return this; + return CompletableFuture.allOf(completions.completableFuture(), futureResponse) + .thenApply(__ -> { + try { + return futureResponse.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); // Should not happens since both futures are complete + } + }); } - @Override public void addListener(Runnable listener, Executor executor) { - List<ListenableFuture<?>> combined = new ArrayList<>(2); - combined.add(completions); - combined.add(futureResponse); - Futures.allAsList(combined).addListener(listener, executor); + CompletableFuture.allOf(completions.completableFuture(), futureResponse) + .whenCompleteAsync((__, ___) -> listener.run(), executor); } @Override diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/ResponseDispatch.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/ResponseDispatch.java index 377c8ecf4a9..9387171c1ac 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/ResponseDispatch.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/ResponseDispatch.java @@ -1,15 +1,17 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; -import com.google.common.util.concurrent.ForwardingListenableFuture; -import com.google.common.util.concurrent.ListenableFuture; import com.yahoo.jdisc.Response; import com.yahoo.jdisc.SharedResource; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * <p>This class provides a convenient way of safely dispatching a {@link Response}. It is similar in use to {@link @@ -34,7 +36,7 @@ import java.util.concurrent.Future; * * @author Simon Thoresen Hult */ -public abstract class ResponseDispatch extends ForwardingListenableFuture<Boolean> { +public abstract class ResponseDispatch implements Future<Boolean> { private final FutureConjunction completions = new FutureConjunction(); @@ -90,19 +92,14 @@ public abstract class ResponseDispatch extends ForwardingListenableFuture<Boolea * @param responseHandler The ResponseHandler to dispatch to. * @return A Future that can be waited for. */ - public final ListenableFuture<Boolean> dispatch(ResponseHandler responseHandler) { + public final CompletableFuture<Boolean> dispatch(ResponseHandler responseHandler) { try (FastContentWriter writer = new FastContentWriter(connect(responseHandler))) { for (ByteBuffer buf : responseContent()) { writer.write(buf); } completions.addOperand(writer); } - return this; - } - - @Override - protected final ListenableFuture<Boolean> delegate() { - return completions; + return completions.completableFuture(); } @Override @@ -115,6 +112,15 @@ public abstract class ResponseDispatch extends ForwardingListenableFuture<Boolea return false; } + @Override public boolean isDone() { return completions.isDone(); } + + @Override public Boolean get() throws InterruptedException, ExecutionException { return completions.get(); } + + @Override + public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return completions.get(timeout, unit); + } + /** * <p>Factory method for creating a ResponseDispatch with a {@link Response} that has the given status code, and * ByteBuffer content.</p> |