aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java
blob: 358f8ad969358bfcef7aee987ad22e74d25aa73b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.response;

import com.yahoo.processing.impl.ProcessingFuture;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
 * A data list own once instance of this which can be used to provide data asynchronously to the list,
 * and consume, wait for or be notified upon the arrival of such data.
 *
 * @author bratseth
 */
public interface IncomingData<DATATYPE extends Data> {

    /**
     * Returns the owner (target DataList) of this.
     * Note that accessing the owner from the thread producing incoming data
     * is generally *not* thread safe.
     */
    DataList<DATATYPE> getOwner();

    /**
     * Returns a future in which all the incoming data that will be produced in this is available.
     * Listeners on this are invoked on the thread producing the incoming data (or a thread spawned from it),
     * which in general is separate from the thread using the data list. Hence, listeners on this even cannot
     * in general assume that they may modify the data list or the request.
     * <p>
     * The data is not {@link #drain drained} into the owner of this by this method. That must be done
     * by the thread using the data list.
     * <p>
     * This return the list owning this for convenience.
     */
    CompletableFuture<DataList<DATATYPE>> completedFuture();

    /**
     * Returns whether this is complete
     */
    boolean isComplete();

    /**
     * Add new data and mark this as completed
     *
     * @throws IllegalStateException if this is already complete or does not allow writes
     */
    void addLast(DATATYPE data);

    /**
     * Add new data without completing this
     *
     * @throws IllegalStateException if this is already complete or does not allow writes
     */
    void add(DATATYPE data);

    /**
     * Add new data and mark this as completed
     *
     * @throws IllegalStateException if this is already complete or does not allow writes
     */
    void addLast(List<DATATYPE> data);

    /**
     * Add new data without completing this.
     *
     * @throws IllegalStateException if this is already complete or does not allow writes
     */
    void add(List<DATATYPE> data);

    /**
     * Mark this as completed and notify any listeners. If this is already complete this method does nothing.
     */
    void markComplete();

    /**
     * Get and remove all the data currently available in this
     */
    List<DATATYPE> drain();

    /**
     * Add a listener which will be invoked every time new data is added to this.
     * This listener may be invoked at any time in any thread, any thread synchronization is left
     * to the listener itself
     */
    void addNewDataListener(Runnable listener, Executor executor);

    /**
     * Creates a null implementation of this which is empty and complete at creation:
     * <ul>
     * <li>Provides immediate return without incurring any memory synchronization for
     * any read method.
     * <li>Throws an exception on any write method
     * </ul>
     * <p>
     * This allows consumers to check for completion the same way whether or not the data list in question
     * supports asynchronous addition of data, and without incurring unnecessary costs.
     */
    final class NullIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> {

        private final DataList<DATATYPE> owner;
        private final ImmediateFuture<DATATYPE> completionFuture;

        public NullIncomingData(DataList<DATATYPE> owner) {
            this.owner = owner;
            completionFuture = new ImmediateFuture<>(owner);
        }

        @Override public CompletableFuture<DataList<DATATYPE>> completedFuture() { return completionFuture; }

        @Override
        public DataList<DATATYPE> getOwner() {
            return owner;
        }

        /**
         * Returns true
         */
        @Override
        public boolean isComplete() {
            return true;
        }

        /**
         * @throws IllegalStateException as this is read only
         */
        public void addLast(DATATYPE data) {
            throw new IllegalStateException(owner + " does not support adding data asynchronously");
        }

        /**
         * @throws IllegalStateException as this is read only
         */
        public void add(DATATYPE data) {
            throw new IllegalStateException(owner + " does not support adding data asynchronously");
        }

        /**
         * @throws IllegalStateException as this is read only
         */
        public void addLast(List<DATATYPE> data) {
            throw new IllegalStateException(owner + " does not support adding data asynchronously");
        }

        /**
         * @throws IllegalStateException as this is read only
         */
        public void add(List<DATATYPE> data) {
            throw new IllegalStateException(owner + " does not support adding data asynchronously");
        }

        /**
         * Do nothing as this is already complete
         */
        public void markComplete() {
        }

        public List<DATATYPE> drain() {
            return Collections.emptyList();
        }

        /**
         * Adds a new data listener to this - this is a no-op
         * as new data can never be added to this implementation.
         */
        public void addNewDataListener(Runnable listener, Executor executor) { }

        public String toString() {
            return "(no incoming)";
        }

        /**
         * A future which is always done and incurs no synchronization.
         * This is semantically the same as Futures.immediateFuture but contrary to it,
         * this never causes any memory synchronization when accessed.
         */
        public static class ImmediateFuture<DATATYPE extends Data> extends ProcessingFuture<DataList<DATATYPE>> {

            private final DataList<DATATYPE> owner;

            public ImmediateFuture(DataList<DATATYPE> owner) {
                this.owner = owner; // keep here to avoid memory synchronization for access
                complete(owner); // Signal completion (for future listeners)
            }

            @Override
            public boolean cancel(boolean b) {
                return false;
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return true;
            }

            @Override
            public DataList<DATATYPE> get() {
                return owner;
            }

            @Override
            public DataList<DATATYPE> get(long l, TimeUnit timeUnit) {
                return owner;
            }

        }

    }

}