aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/processing/Response.java
blob: c6c513cf28af602feceb61efe5f632ede9812c80 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing;

import com.yahoo.component.provider.ListenableFreezableClass;
import com.yahoo.concurrent.SystemTimer;
import com.yahoo.processing.execution.ResponseReceiver;
import com.yahoo.processing.impl.ProcessingFuture;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.processing.request.ErrorMessage;
import com.yahoo.processing.response.ArrayDataList;
import com.yahoo.processing.response.Data;
import com.yahoo.processing.response.DataList;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * A Response to a Request.
 * <p>
 * A Response contains a list of Data items, which may (through Data implementations) contain payload data and/or
 * further nested data lists.
 * <p>
 * Frameworks built on top of processing may subclass this to create a stricter definition of a response.
 * Processors producing Responses should not create subclasses but should instead
 * create additional instances/subclasses of Data. Such Processors should always create Response instances by calling
 * execution.process(request), which will return an empty Response if there are no further processors in the chain.
 * <p>
 * Do not cache this as it may hold references to objects that should be garbage collected.
 *
 * @author bratseth
 */
public class Response extends ListenableFreezableClass {

    private final static CompoundName freezeListenerKey = CompoundName.from("processing.freezeListener");

    private final DataList<?> data;

    /** Creates a request containing an empty array data list */
    public Response(Request request) {
        this(ArrayDataList.create(request));
    }

    /** Creates a response containing a list of data */
    public Response(DataList<?> data) {
        this.data = data;

        Runnable freezeListener = null;
        Request request = data.request();
        if (request != null) // subclasses of DataList may not ensure this
            freezeListener = (Runnable)request.properties().get(freezeListenerKey);
        if (freezeListener != null) {
            if (freezeListener instanceof ResponseReceiver)
                ((ResponseReceiver)freezeListener).setResponse(this);
            data.addFreezeListener(freezeListener, Runnable::run);
        }
    }

    /**
     * Convenience constructor which adds the given error message to the given request
     */
    public Response(Request request, ErrorMessage errorMessage) {
        this(ArrayDataList.create(request));
        request.errors().add(errorMessage);
    }

    /**
     * Processors which merges another request into this must call this method to notify the response.
     * This does not modify the data of either response.
     */
    public void mergeWith(Response other) {
    }

    /**
     * Returns the top level list of data items of this response
     */
    public DataList data() {
        return data;
    }

    // ------ static utilities ----------------------------------------------------------------------------

    /**
     * Returns a future in which the given data list and all lists nested within it are completed.
     * The only use of the returned future is to call a get() method on it to complete the given dataList and
     * all dataLists nested below it recursively.
     * <p>
     * Lists are completed in prefix, depth-first order. DataLists added after the point when this method is called
     * will not be completed.
     *
     * @param rootDataList the list to complete recursively
     * @return the future in which all data in and below this list is complete, as the given root dataList for convenience
     */
    public static <D extends Data> CompletableFuture<DataList<D>> recursiveFuture(DataList<D> rootDataList) {
        List<CompletableFuture<DataList<D>>> futures = new ArrayList<>();
        collectCompletionFutures(rootDataList, futures);
        return new CompleteAllOnGetFuture<D>(futures);
    }

    @SuppressWarnings("unchecked")
    private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<CompletableFuture<DataList<D>>> futures) {
        futures.add(dataList.completeFuture());
        for (D data : dataList.asList()) {
            if (data instanceof DataList)
                collectCompletionFutures((DataList<D>) data, futures);
        }
    }

    /**
     * A future which on get calls get on all its given futures and sets the value returned from the
     * first given future as its result.
     */
    private static class CompleteAllOnGetFuture<D extends Data> extends ProcessingFuture<DataList<D>> {

        private final List<CompletableFuture<DataList<D>>> futures;

        public CompleteAllOnGetFuture(List<CompletableFuture<DataList<D>>> futures) {
            this.futures = new ArrayList<>(futures);
        }

        @Override
            public DataList<D> get() throws InterruptedException, ExecutionException {
            DataList<D> result = null;
            for (CompletableFuture<DataList<D>> future : futures) {
                if (result == null)
                    result = future.get();
                else
                    future.get();
            }
            complete(result);
            return result;
        }

        @Override
            public DataList<D> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            DataList<D> result = null;
            long timeLeft = unit.toMillis(timeout);
            long currentCallStart = SystemTimer.INSTANCE.milliTime();
            for (CompletableFuture<DataList<D>> future : futures) {
                if (result == null)
                    result = future.get(timeLeft, TimeUnit.MILLISECONDS);
                else
                    future.get(timeLeft, TimeUnit.MILLISECONDS);
                long currentCallEnd = SystemTimer.INSTANCE.milliTime();
                timeLeft -= (currentCallEnd - currentCallStart);
                if (timeLeft <= 0) break;
                currentCallStart = currentCallEnd;
            }
            complete(result);
            return result;
        }

    }

}