// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.rendering; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.jdisc.handler.CompletionHandler; import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.processing.Request; import com.yahoo.processing.Response; import com.yahoo.processing.execution.Execution; import com.yahoo.processing.response.AbstractDataList; import com.yahoo.processing.response.Data; import com.yahoo.processing.response.DataList; import com.yahoo.processing.response.Ordered; import com.yahoo.processing.response.Streamed; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * Helper class to implement processing API Response renderers. This renderer * will walk the data tree and call the appropriate render methods as it * progresses. Nodes with the same parent branch will be rendered in the order * in which the data is ready for consumption. * *
* This API assumes all data should be rendered. Choosing which data should be * rendered is the responsibility of the processing chains. *
* * @author Steinar Knutsen * @author Einar M R Rosenvinge * @author bratseth */ public abstract class AsynchronousSectionedRendererRender this response using the renderer's own threads and return a future indicating whether the rendering * was successful. The data list tree will be traversed asynchronously, and * the pertinent methods will be called as data becomes available.
* *If rendering fails, the exception causing this will be wrapped in an * ExecutionException and thrown from blocked calls to Future.get()
* * @return a future indicating whether rendering was successful */ @Override public final CompletableFuture* A stack of registered renderers is maintained to maintain this constraint. *
* A renderer maintains state sufficient to allow it to resume rendering at a later stage. * This is to be able to render child lists to completion before completing rendering of the parent list. * In addition, this feature is used by DataListeners (see below). */ private class DataListListener extends RendererListener { /** The index of the next data item where rendering should be initiated in this list */ private int currentIndex = 0; /** Children of this which has started rendering but not yet completed */ private int uncompletedChildren = 0; private boolean listStartIsRendered = false; /** The list which this is listening to */ private final DataList list; /** The listener to the parent of this list, or null if this is the root */ private final DataListListener parent; public DataListListener(DataList list, DataListListener parent) { this.list = list; this.parent = parent; } @Override protected void render() throws IOException, InterruptedException, ExecutionException { if (dataListListenerStack.peekFirst() != this) return; // This listens to some ancestor of the current list, do this later if (beforeHandoverMode && ! list.isFrozen()) return; // Called on completion of a list which is not frozen yet - hold off until frozen if ( ! beforeHandoverMode) list.completeFuture().get(); // trigger completion if not done already to invoke any listeners on that event boolean startedRendering = renderData(); if ( ! startedRendering || uncompletedChildren > 0) return; // children must render to completion first if (list.completeFuture().isDone()) // might not be when in before handover mode endListLevel(); else stream.flush(); } private void endListLevel() throws IOException { endRenderLevel(list); stream.flush(); dataListListenerStack.removeFirst(); if (parent != null) parent.childCompleted(); list.close(); } /** Called each time a direct child of this completed. */ private void childCompleted() { uncompletedChildren--; if (uncompletedChildren > 0) return; if (list.incoming().isComplete()) // i) if the parent had completed earlier, render it now, see ii) run(); } /** * Resumes rendering data from the current position. * Called both on completion (by this), and when new data is available (from the new data listener). * * @return whether this started rendering */ @SuppressWarnings("unchecked") private boolean renderData() throws IOException { if (dataListListenerStack.peekFirst() != this) return false; // This listens to some ancestor of the current list, do this later renderDataListStart(); // Add newly arrived data, and as a consequence run data listeners for (Object data : list.incoming().drain()) list.add((Data) data); renderDataList(list); return true; } void renderDataListStart() throws IOException { if ( ! listStartIsRendered) { if (list instanceof ParentOfTopLevel) beginResponse(stream); else beginList(list); listStartIsRendered = true; } } /** Renders a list. */ private void renderDataList(DataList list) throws IOException { boolean ordered = isOrdered(list); if (list.asList() == null) { logger.log(Level.WARNING, "DataList.asList() returned null, indicating it is closed. " + "This is likely caused by adding the same list multiple " + "times in the response."); return; } while (currentIndex < list.asList().size()) { Data data = list.get(currentIndex++); if (data instanceof DataList) { listenTo((DataList)data, ordered && isStreamed((DataList)data)); uncompletedChildren++; if (ordered) return; // ii) Resumed by the child list when done, see i) } else { data(data); } } } private void listenTo(DataList subList, boolean listenToNewDataAdded) throws IOException { DataListListener listListener = new DataListListener(subList,this); dataListListenerStack.addFirst(listListener); if (listenToNewDataAdded) subList.incoming().addNewDataListener(new DataListener(listListener), getExecutor()); flushIfLikelyToSuspend(subList); subList.addFreezeListener(listListener, getExecutor()); subList.completeFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor()); subList.incoming().completedFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor()); } private boolean isOrdered(DataList dataList) { if (! (dataList instanceof Ordered)) return true; // all lists are ordered by default return ((Ordered)dataList).isOrdered(); } private boolean isStreamed(DataList dataList) { if (! (dataList instanceof Streamed)) return true; // all lists are streamed by default return ((Streamed)dataList).isStreamed(); } private void endRenderLevel(DataList> current) throws IOException { if (current instanceof ParentOfTopLevel) { endResponse(); closeIO(null); } else { endList(current); } } private void closeIO(Exception failed) { IOException closeException = null; try { stream.close(); } catch (IOException e) { closeException = e; logger.log(Level.WARNING, "Exception caught while closing stream to client.", e); } finally { if (failed != null) { success.completeExceptionally(failed); } else if (closeException != null) { success.completeExceptionally(closeException); } else { success.complete(true); } if (channel != null) { channel.close(completionHandler); } } } @Override public String toString() { return "listener to " + list; } } /** * A data listener is invoked every time new data is available in an incoming list, such that this data * can be rendered before completion of the entire list (streaming). *
* One data renderer is registered for every incoming data list. * It will delegate to the data list listener of the same list such that the correct rendering progress state is * shared between rendering here and from the completion listener. */ private class DataListener extends RendererListener { /** The listener to completion of the data list this listens to new data in. */ private DataListListener dataListListener; public DataListener(DataListListener dataListListener) { this.dataListListener = dataListListener; } protected void render() throws IOException { dataListListener.renderData(); flushIfLikelyToSuspend(dataListListener.list); } } private abstract class RendererListener implements Runnable { protected abstract void render() throws IOException, InterruptedException, ExecutionException; public void run() { try { synchronized (singleThreaded) { try { render(); } catch (Exception e) { Level level = Level.WARNING; if ((e instanceof IOException)) { level = Level.FINE; if ( ! clientClosed) { clientClosed = true; onClientClosed(); } } if (logger.isLoggable(level)) { logger.log(level, "Exception caught during response rendering.", e); } if (channel != null) { try { channel.close(completionHandler); } catch (Exception ignored) { } } success.completeExceptionally(e); } } } catch (Error e) { // We are in free-range thread land, and a hanging container is really no fun at all. com.yahoo.protect.Process.logAndDie("Caught fatal error during rendering.", e); } } protected void flushIfLikelyToSuspend(DataList list) throws IOException { // If the listener is not complete, we will (likely) suspend rendering if ( ! list.incoming().isComplete()) stream.flush(); } } /** * This must be pushed on the stack first to get things started off, given that the stack is expected to * contain the parent of each element (including the topmost) */ private static class ParentOfTopLevel extends AbstractDataList { private final DataList trueTopLevel; public ParentOfTopLevel(Request request, DataList trueTopLevel) { super(request); this.trueTopLevel = trueTopLevel; freeze(); } @Override public Data add(Data data) { throw new IllegalStateException("We're not supposed to add to this"); } @Override public void addDataListener(Runnable listener) { throw new IllegalStateException("We're not supposed to listen to or add to this"); } @Override public Data get(int index) { if (index > 0) throw new IndexOutOfBoundsException(); return trueTopLevel; } @Override public List asList() { return List.of(trueTopLevel); } @Override public String toString() { return "ParentOfTopLevel"; } } }