aboutsummaryrefslogtreecommitdiffstats
path: root/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
blob: bac514266cbf1988e3930804c82601cc2b7450b1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.processing.rendering;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.processing.Request;
import com.yahoo.processing.Response;
import com.yahoo.processing.execution.Execution;
import com.yahoo.processing.response.AbstractDataList;
import com.yahoo.processing.response.Data;
import com.yahoo.processing.response.DataList;
import com.yahoo.processing.response.Ordered;
import com.yahoo.processing.response.Streamed;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Helper class to implement processing API Response renderers. This renderer
 * will walk the data tree and call the appropriate render methods as it
 * progresses. Nodes with the same parent branch will be rendered in the order
 * in which the data is ready for consumption.
 *
 * <p>
 * This API assumes all data should be rendered. Choosing which data should be
 * rendered is the responsibility of the processing chains.
 * </p>
 *
 * @author Steinar Knutsen
 * @author Einar M R Rosenvinge
 * @author bratseth
 */
public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> extends AsynchronousRenderer<RESPONSE> {

    /**
     * Invoked once at the beginning of rendering a response. This assigns the
     * stream to be used throughput the rendering. Subsequent calls must use the
     * same stream.
     *
     * @param stream the stream to render to in this and all subsequent calls.
     * @throws IOException passed on from the stream
     */
    public abstract void beginResponse(OutputStream stream) throws IOException;

    /**
     * Invoked at the beginning of each data list, including the implicit,
     * outermost one in the response.
     *
     * @throws IOException passed on from the stream
     * @param list the data list which now will be rendered
     */
    public abstract void beginList(DataList<?> list) throws IOException;

    /**
     * Invoked for each leaf node in the data tree
     *
     * @param data the leaf node to render
     * @throws IOException passed on from the stream
     */
    public abstract void data(Data data) throws IOException;

    /**
     * Invoked at the end of each data list, including the implicit, outermost
     * one in the response.
     *
     * @param list the data list which now has no more data items to render
     * @throws IOException passed on from the stream
     */
    public abstract void endList(DataList<?> list) throws IOException;

    /**
     * Invoked once at the end of rendering a response.
     *
     * @throws IOException passed on from the stream
     */
    public abstract void endResponse() throws IOException;

    private static final Logger logger = Logger.getLogger(AsynchronousSectionedRenderer.class.getName());

    // NOTE: Renderers are *prototype objects* - a new instance is created for each rendering by invoking 
    // clone(), init() and then render().
    // Hence any field which is not reinitialized in init() or render() will be *reused* in all rendering operations
    // across all threads!

    /** The stack of listeners to ancestor datalist completions above the current one */
    private Deque<DataListListener> dataListListenerStack;

    private boolean beforeHandoverMode;
    private OutputStream stream;
    private RESPONSE response;
    private Execution execution;
    private boolean clientClosed;

    // This MUST be created in the init() method - see comment above
    private Object singleThreaded;

    // Rendering threads should never block so use one thread per core.
    // We should complete any work we have already started so use an unbounded queue.
    // The executor SHOULD be reused across all instances having the same prototype
    private final Executor renderingExecutor;
    // The executor may either be created (and thus owned) by this, or passed by injection
    private final boolean renderingExecutorIsOwned;

    private static ThreadPoolExecutor createExecutor() {
        int threadCount = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount, 1L, TimeUnit.SECONDS,
                                                             new LinkedBlockingQueue<>(),
                                                             ThreadFactoryFactory.getThreadFactory("rendering"));
        executor.prestartAllCoreThreads();
        return executor;
    }

    private CompletableFuture<Boolean> success;

    private ContentChannel channel;
    private CompletionHandler completionHandler;
    private boolean networkIsInitialized;

    private boolean isInitialized;

    /**
     * Create an renderer instance not yet associated with any request
     * processing or network for easy subclassing. It is the handler's
     * responsibility to wire in the resources needed by a renderer
     * before use.
     */
    public AsynchronousSectionedRenderer() {
        this(null);
    }

    /**
     * Create a renderer using the specified executor instead of the default one which should be used for production.
     * Using a custom executor is useful for tests to avoid creating new threads for each renderer registry.
     * 
     * @param executor the executor to use or null to use the default executor suitable for production
     */
    public AsynchronousSectionedRenderer(Executor executor) {
        isInitialized = false;
        if (executor == null) {
            renderingExecutor = createExecutor();
            renderingExecutorIsOwned = true;
        }
        else {
            renderingExecutor = executor;
            renderingExecutorIsOwned = false;
        }
    }

    /**
     * <p>Render this response using the renderer's own threads and return a future indicating whether the rendering
     * was successful. The data list tree will be traversed asynchronously, and
     * the pertinent methods will be called as data becomes available.</p>
     *
     * <p>If rendering fails, the exception causing this will be wrapped in an
     * ExecutionException and thrown from blocked calls to Future.get()</p>
     *
     * @return a future indicating whether rendering was successful
     */
    @Override
    public final CompletableFuture<Boolean> renderResponse(OutputStream stream, RESPONSE response,
                                                           Execution execution, Request request) {
        if (beforeHandoverMode) { // rendering has already started or is already complete
            beforeHandoverMode = false;
            if ( ! dataListListenerStack.isEmpty() &&
                 dataListListenerStack.getFirst().list.incoming().isComplete()) {
                // We're not waiting for async completion, so kick off more rendering due to the implicit complete
                // (return Response from chain) causing this method to be called
                getExecutor().execute(dataListListenerStack.getFirst());
            }
            return success;
        }
        else { // This is the start of rendering
            return startRender(stream, response, execution, request);
        }
    }

    @Override
    public void deconstruct() {
        super.deconstruct();
        if (renderingExecutorIsOwned && renderingExecutor instanceof ThreadPoolExecutor)
            shutdown((ThreadPoolExecutor) renderingExecutor);
    }
    
    private void shutdown(ThreadPoolExecutor executor) {
        executor.shutdown();
        try {
            if ( ! executor.awaitTermination(30, TimeUnit.SECONDS))
                throw new RuntimeException("Rendering thread pool did not shutdown in 30 seconds");
        }
        catch (InterruptedException e) {
            // return
        }
    }
    
    /**
     * Initiate rendering before handover to rendering threads.
     * This is rendering which happens before the Response is returned from the main chain,
     * caused by freezing of DataLists.
     * At this point the worker thread still owns the Response, so all this rendering must happen
     * on the caller thread invoking freeze (that is, on the thread calling this).
     */
    public final CompletableFuture<Boolean> renderResponseBeforeHandover(OutputStream stream, RESPONSE response,
                                                                         Execution execution, Request request) {
        beforeHandoverMode = true;
        if ( ! isInitialized) throw new IllegalStateException("render() invoked before init().");

        return startRender(stream, response, execution, request);
    }

    private CompletableFuture<Boolean> startRender(OutputStream stream, RESPONSE response,
                                                  Execution execution, Request request) {
        this.response = response;
        this.stream = stream;
        this.execution = execution;
        DataListListener parentOfTopLevelListener = new DataListListener(new ParentOfTopLevel(request, response.data()), null);
        dataListListenerStack.addFirst(parentOfTopLevelListener);
        success = new CompletableFuture<>();
        try {
            getExecutor().execute(parentOfTopLevelListener);
        } catch (RejectedExecutionException e) {
            parentOfTopLevelListener.closeIO(e);
        }
        return success;
    }

    /**
     * Returns the executor in which to execute a listener.
     * Before handover this *must* be the calling thread, because listeners are free to modify the dataList.
     * After handover it can be any thread in the renderer pool.
     * Note that as some listeners may be set up before handover and executed after, it is possible that some rendering
     * inadvertently work ends up in async data producing threads in some cases.
     */
    Executor getExecutor() {
        return beforeHandoverMode ? Runnable::run : renderingExecutor;
    }
    /** For inspection only; use getExecutor() for execution */
    Executor getRenderingExecutor() { return renderingExecutor; }    

    /** The outermost execution which was run to create the response to render. */
    public Execution getExecution() { return execution; }

    /** The response render callbacks are generated from. */
    public Response getResponse() { return response; }

    /** Returns whether the client this is rendering to has closed the connection */
    protected boolean clientClosed() { return clientClosed; }

    /** This hook is called once when the renderer detects that the client has closed the connection */
    protected void onClientClosed() { }

    /**
     * How deep into the tree of nested data lists the callback currently is.
     * beginList() is invoked after this is increased, and endList() is
     * invoked before it is decreased.
     *
     * @return an integer of 1 or above
     */
    public int getRecursionLevel() {
        return dataListListenerStack.size() - 1;
    }

    /**
     * For internal use: Expose JDisc wiring to ensure asynchronous cleanup.
     *
     * @param channel the channel to the client receiving the response
     * @param completionHandler the JDisc completion handler which will be invoked at the end
     *        of the rendering
     * @throws IllegalStateException if attempted invoked more than once
     */
    @Override
    public final void setNetworkWiring(ContentChannel channel, CompletionHandler completionHandler) {
        if (networkIsInitialized)
            throw new IllegalStateException("Network wiring already set and can only be set once.");

        this.channel = channel;
        this.completionHandler = completionHandler;
        networkIsInitialized = true;
    }

    /**
     * Do per instance initialization. If overriding this in a subclass, not
     * invoking it in the subclass' implementation will most likely cause the
     * rendering to fail with an exception.
     */
    @Override
    public void init() {
        beforeHandoverMode = false;
        clientClosed = false;
        singleThreaded = new Object();
        dataListListenerStack = new ArrayDeque<>();
        networkIsInitialized = false;
        isInitialized  = true;
    }

    /**
     * A listener to async completion of a data list.
     * All rendering is done by callbacks to this, even in the sync case (where the callback happens immediately).
     * One such listener is registered for every data list encountered during rendering.
     * Only the last one registered is allowed to run at any point in time, as that one renders the lowest level
     * list not yet completed (rendering, of course is depth first).
     * <p>
     * A stack of registered renderers is maintained to maintain this constraint.
     * <p>
     * A renderer maintains state sufficient to allow it to resume rendering at a later stage.
     * This is to be able to render child lists to completion before completing rendering of the parent list.
     * In addition, this feature is used by DataListeners (see below).
     */
    private class DataListListener extends RendererListener {

        /** The index of the next data item where rendering should be initiated in this list */
        private int currentIndex = 0;

        /** Children of this which has started rendering but not yet completed */
        private int uncompletedChildren = 0;

        private boolean listStartIsRendered = false;

        /** The list which this is listening to */
        private final DataList list;

        /** The listener to the parent of this list, or null if this is the root */
        private final DataListListener parent;

        public DataListListener(DataList list, DataListListener parent) {
            this.list = list;
            this.parent = parent;
        }

        @Override
        protected void render() throws IOException, InterruptedException, ExecutionException {
            if (dataListListenerStack.peekFirst() != this)
                return; // This listens to some ancestor of the current list, do this later
            if (beforeHandoverMode &&  ! list.isFrozen())
                return; // Called on completion of a list which is not frozen yet - hold off until frozen

            if ( ! beforeHandoverMode)
                list.completeFuture().get(); // trigger completion if not done already to invoke any listeners on that event
            boolean startedRendering = renderData();
            if ( ! startedRendering || uncompletedChildren > 0) return; // children must render to completion first
            if (list.completeFuture().isDone()) // might not be when in before handover mode
                endListLevel();
            else
                stream.flush();
        }

        private void endListLevel() throws IOException {
            endRenderLevel(list);
            stream.flush();
            dataListListenerStack.removeFirst();
            if (parent != null)
                parent.childCompleted();
            list.close();
        }

        /** Called each time a direct child of this completed. */
        private void childCompleted() {
            uncompletedChildren--;

            if (uncompletedChildren > 0) return;
            if (list.incoming().isComplete()) // i) if the parent had completed earlier, render it now, see ii)
                run();
        }

        /**
         * Resumes rendering data from the current position.
         * Called both on completion (by this), and when new data is available (from the new data listener).
         *
         * @return whether this started rendering
         */
        @SuppressWarnings("unchecked")
        private boolean renderData() throws IOException {
            if (dataListListenerStack.peekFirst() != this) return false; // This listens to some ancestor of the current list, do this later
            renderDataListStart();

            // Add newly arrived data, and as a consequence run data listeners
            for (Object data : list.incoming().drain())
                list.add((Data) data);

            renderDataList(list);
            return true;
        }

        void renderDataListStart() throws IOException {
            if ( ! listStartIsRendered) {
                if (list instanceof ParentOfTopLevel)
                    beginResponse(stream);
                else
                    beginList(list);
                listStartIsRendered = true;
            }
        }

        /** Renders a list. */
        private void renderDataList(DataList list) throws IOException {
            boolean ordered = isOrdered(list);
            if (list.asList() == null) {
                logger.log(Level.WARNING, "DataList.asList() returned null, indicating it is closed. " +
                                          "This is likely caused by adding the same list multiple " +
                                          "times in the response.");
                return;
            }
            while (currentIndex < list.asList().size()) {
                Data data = list.get(currentIndex++);
                if (data instanceof DataList) {
                    listenTo((DataList)data, ordered && isStreamed((DataList)data));
                    uncompletedChildren++;
                    if (ordered)
                        return; // ii) Resumed by the child list when done, see i)
                }
                else {
                    data(data);
                }
            }
        }

        private void listenTo(DataList subList, boolean listenToNewDataAdded) throws IOException {
            DataListListener listListener = new DataListListener(subList,this);
            dataListListenerStack.addFirst(listListener);

            if (listenToNewDataAdded)
                subList.incoming().addNewDataListener(new DataListener(listListener), getExecutor());

            flushIfLikelyToSuspend(subList);

            subList.addFreezeListener(listListener, getExecutor());
            subList.completeFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
            subList.incoming().completedFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
        }

        private boolean isOrdered(DataList dataList) {
            if (! (dataList instanceof Ordered))
                return true; // all lists are ordered by default
            return ((Ordered)dataList).isOrdered();
        }

        private boolean isStreamed(DataList dataList) {
            if (! (dataList instanceof Streamed))
                return true; // all lists are streamed by default
            return ((Streamed)dataList).isStreamed();
        }

        private void endRenderLevel(DataList<?> current) throws IOException {
            if (current instanceof ParentOfTopLevel) {
                endResponse();
                closeIO(null);
            }
            else {
                endList(current);
            }
        }

        private void closeIO(Exception failed) {
            IOException closeException = null;

            try {
                stream.close();
            } catch (IOException e) {
                closeException = e;
                logger.log(Level.WARNING, "Exception caught while closing stream to client.", e);
            } finally {
                if (failed != null) {
                    success.completeExceptionally(failed);
                } else if (closeException != null) {
                    success.completeExceptionally(closeException);
                } else {
                    success.complete(true);
                }
                if (channel != null) {
                    channel.close(completionHandler);
                }
            }
        }

        @Override
        public String toString() {
            return "listener to " + list;
        }

    }

    /**
     * A data listener is invoked every time new data is available in an incoming list, such that this data
     * can be rendered before completion of the entire list (streaming).
     * <p>
     * One data renderer is registered for every incoming data list.
     * It will delegate to the data list listener of the same list such that the correct rendering progress state is
     * shared between rendering here and from the completion listener.
     */
    private class DataListener extends RendererListener {

        /** The listener to completion of the data list this listens to new data in. */
        private DataListListener dataListListener;

        public DataListener(DataListListener dataListListener) {
            this.dataListListener = dataListListener;
        }

        protected void render() throws IOException {
            dataListListener.renderData();
            flushIfLikelyToSuspend(dataListListener.list);
        }

    }

    private abstract class RendererListener implements Runnable {

        protected abstract void render() throws IOException, InterruptedException, ExecutionException;

        public void run() {
            try {
                synchronized (singleThreaded) {
                    try {
                        render();
                    } catch (Exception e) {
                        Level level = Level.WARNING;
                        if ((e instanceof IOException)) {
                            level = Level.FINE;
                            if ( ! clientClosed) {
                                clientClosed = true;
                                onClientClosed();
                            }
                        }
                        if (logger.isLoggable(level)) {
                            logger.log(level, "Exception caught during response rendering.", e);
                        }
                        if (channel != null) {
                            try {
                                channel.close(completionHandler);
                            } catch (Exception ignored) {
                            }
                        }
                        success.completeExceptionally(e);
                    }
                }
            } catch (Error e) {
                // We are in free-range thread land, and a hanging container is really no fun at all.
                com.yahoo.protect.Process.logAndDie("Caught fatal error during rendering.", e);
            }
        }

        protected void flushIfLikelyToSuspend(DataList list) throws IOException {
            // If the listener is not complete, we will (likely) suspend rendering
            if ( ! list.incoming().isComplete()) stream.flush();
        }

    }

    /**
     * This must be pushed on the stack first to get things started off, given that the stack is expected to
     * contain the parent of each element (including the topmost)
     */
    private static class ParentOfTopLevel extends AbstractDataList {

        private final DataList trueTopLevel;

        public ParentOfTopLevel(Request request, DataList trueTopLevel) {
            super(request);
            this.trueTopLevel = trueTopLevel;
            freeze();
        }

        @Override
        public Data add(Data data) {
            throw new IllegalStateException("We're not supposed to add to this");
        }

        @Override
        public void addDataListener(Runnable listener) {
            throw new IllegalStateException("We're not supposed to listen to or add to this");
        }

        @Override
        public Data get(int index) {
            if (index > 0) throw new IndexOutOfBoundsException();
            return trueTopLevel;
        }

        @Override
        public List<Data> asList() {
            return List.of(trueTopLevel);
        }

        @Override
        public String toString() {
            return "ParentOfTopLevel";
        }

    }

}