// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing; import com.yahoo.component.provider.ListenableFreezableClass; import com.yahoo.concurrent.SystemTimer; import com.yahoo.processing.execution.ResponseReceiver; import com.yahoo.processing.impl.ProcessingFuture; import com.yahoo.processing.request.CompoundName; import com.yahoo.processing.request.ErrorMessage; import com.yahoo.processing.response.ArrayDataList; import com.yahoo.processing.response.Data; import com.yahoo.processing.response.DataList; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * A Response to a Request. *

* A Response contains a list of Data items, which may (through Data implementations) contain payload data and/or * further nested data lists. *

* Frameworks built on top of processing may subclass this to create a stricter definition of a response. * Processors producing Responses should not create subclasses but should instead * create additional instances/subclasses of Data. Such Processors should always create Response instances by calling * execution.process(request), which will return an empty Response if there are no further processors in the chain. *

* Do not cache this as it may hold references to objects that should be garbage collected. * * @author bratseth */ public class Response extends ListenableFreezableClass { private final static CompoundName freezeListenerKey = CompoundName.from("processing.freezeListener"); private final DataList data; /** Creates a request containing an empty array data list */ public Response(Request request) { this(ArrayDataList.create(request)); } /** Creates a response containing a list of data */ public Response(DataList data) { this.data = data; Runnable freezeListener = null; Request request = data.request(); if (request != null) // subclasses of DataList may not ensure this freezeListener = (Runnable)request.properties().get(freezeListenerKey); if (freezeListener != null) { if (freezeListener instanceof ResponseReceiver) ((ResponseReceiver)freezeListener).setResponse(this); data.addFreezeListener(freezeListener, Runnable::run); } } /** * Convenience constructor which adds the given error message to the given request */ public Response(Request request, ErrorMessage errorMessage) { this(ArrayDataList.create(request)); request.errors().add(errorMessage); } /** * Processors which merges another request into this must call this method to notify the response. * This does not modify the data of either response. */ public void mergeWith(Response other) { } /** * Returns the top level list of data items of this response */ public DataList data() { return data; } // ------ static utilities ---------------------------------------------------------------------------- /** * Returns a future in which the given data list and all lists nested within it are completed. * The only use of the returned future is to call a get() method on it to complete the given dataList and * all dataLists nested below it recursively. *

* Lists are completed in prefix, depth-first order. DataLists added after the point when this method is called * will not be completed. * * @param rootDataList the list to complete recursively * @return the future in which all data in and below this list is complete, as the given root dataList for convenience */ public static CompletableFuture> recursiveFuture(DataList rootDataList) { List>> futures = new ArrayList<>(); collectCompletionFutures(rootDataList, futures); return new CompleteAllOnGetFuture(futures); } @SuppressWarnings("unchecked") private static void collectCompletionFutures(DataList dataList, List>> futures) { futures.add(dataList.completeFuture()); for (D data : dataList.asList()) { if (data instanceof DataList) collectCompletionFutures((DataList) data, futures); } } /** * A future which on get calls get on all its given futures and sets the value returned from the * first given future as its result. */ private static class CompleteAllOnGetFuture extends ProcessingFuture> { private final List>> futures; public CompleteAllOnGetFuture(List>> futures) { this.futures = new ArrayList<>(futures); } @Override public DataList get() throws InterruptedException, ExecutionException { DataList result = null; for (CompletableFuture> future : futures) { if (result == null) result = future.get(); else future.get(); } complete(result); return result; } @Override public DataList get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { DataList result = null; long timeLeft = unit.toMillis(timeout); long currentCallStart = SystemTimer.INSTANCE.milliTime(); for (CompletableFuture> future : futures) { if (result == null) result = future.get(timeLeft, TimeUnit.MILLISECONDS); else future.get(timeLeft, TimeUnit.MILLISECONDS); long currentCallEnd = SystemTimer.INSTANCE.milliTime(); timeLeft -= (currentCallEnd - currentCallStart); if (timeLeft <= 0) break; currentCallStart = currentCallEnd; } complete(result); return result; } } }