diff options
Diffstat (limited to 'container-core/src/main/java/com/yahoo/processing/Response.java')
-rw-r--r-- | container-core/src/main/java/com/yahoo/processing/Response.java | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/container-core/src/main/java/com/yahoo/processing/Response.java b/container-core/src/main/java/com/yahoo/processing/Response.java new file mode 100644 index 00000000000..485513cd0cb --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/Response.java @@ -0,0 +1,160 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.processing.execution.ResponseReceiver; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.ArrayDataList; +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.DataList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A Response to a Request. + * <p> + * A Response contains a list of Data items, which may (through Data implementations) contain payload data and/or + * further nested data lists. + * <p> + * Frameworks built on top of processing may subclass this to create a stricter definition of a response. + * Processors producing Responses should not create subclasses but should instead + * create additional instances/subclasses of Data. Such Processors should always create Response instances by calling + * execution.process(request), which will return an empty Response if there are no further processors in the chain. + * <p> + * Do not cache this as it may hold references to objects that should be garbage collected. + * + * @author bratseth + */ +public class Response extends ListenableFreezableClass { + + private final static CompoundName freezeListenerKey =new CompoundName("processing.freezeListener"); + + private final DataList<?> data; + + /** Creates a request containing an empty array data list */ + public Response(Request request) { + this(ArrayDataList.create(request)); + } + + /** Creates a response containing a list of data */ + public Response(DataList<?> data) { + this.data = data; + + Runnable freezeListener = null; + Request request = data.request(); + if (request != null) // subclasses of DataList may not ensure this + freezeListener = (Runnable)request.properties().get(freezeListenerKey); + if (freezeListener != null) { + if (freezeListener instanceof ResponseReceiver) + ((ResponseReceiver)freezeListener).setResponse(this); + data.addFreezeListener(freezeListener, MoreExecutors.directExecutor()); + } + } + + /** + * Convenience constructor which adds the given error message to the given request + */ + public Response(Request request, ErrorMessage errorMessage) { + this(ArrayDataList.create(request)); + request.errors().add(errorMessage); + } + + /** + * Processors which merges another request into this must call this method to notify the response. + * This does not modify the data of either response. + */ + public void mergeWith(Response other) { + } + + /** + * Returns the top level list of data items of this response + */ + public DataList data() { + return data; + } + + // ------ static utilities ---------------------------------------------------------------------------- + + /** + * Returns a future in which the given data list and all lists nested within it are completed. + * The only use of the returned future is to call a get() method on it to complete the given dataList and + * all dataLists nested below it recursively. + * <p> + * Lists are completed in prefix, depth-first order. DataLists added after the point when this method is called + * will not be completed. + * + * @param rootDataList the list to complete recursively + * @return the future in which all data in and below this list is complete, as the given root dataList for convenience + */ + public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) { + List<ListenableFuture<DataList<D>>> futures = new ArrayList<>(); + collectCompletionFutures(rootDataList, futures); + return new CompleteAllOnGetFuture<D>(futures); + } + + @SuppressWarnings("unchecked") + private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<ListenableFuture<DataList<D>>> futures) { + futures.add(dataList.complete()); + for (D data : dataList.asList()) { + if (data instanceof DataList) + collectCompletionFutures((DataList<D>) data, futures); + } + } + + /** + * A future which on get calls get on all its given futures and sets the value returned from the + * first given future as its result. + */ + private static class CompleteAllOnGetFuture<D extends Data> extends AbstractFuture<DataList<D>> { + + private final List<ListenableFuture<DataList<D>>> futures; + + public CompleteAllOnGetFuture(List<ListenableFuture<DataList<D>>> futures) { + this.futures = new ArrayList<>(futures); + } + + @Override + public DataList<D> get() throws InterruptedException, ExecutionException { + DataList<D> result = null; + for (ListenableFuture<DataList<D>> future : futures) { + if (result == null) + result = future.get(); + else + future.get(); + } + set(result); + return result; + } + + @Override + public DataList<D> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + DataList<D> result = null; + long timeLeft = unit.toMillis(timeout); + long currentCallStart = SystemTimer.INSTANCE.milliTime(); + for (ListenableFuture<DataList<D>> future : futures) { + if (result == null) + result = future.get(timeLeft, TimeUnit.MILLISECONDS); + else + future.get(timeLeft, TimeUnit.MILLISECONDS); + long currentCallEnd = SystemTimer.INSTANCE.milliTime(); + timeLeft -= (currentCallEnd - currentCallStart); + if (timeLeft <= 0) break; + currentCallStart = currentCallEnd; + } + set(result); + return result; + } + + } + +} |