aboutsummaryrefslogtreecommitdiffstats
path: root/container-core
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@vespa.ai>2024-01-11 13:15:33 +0100
committerBjørn Christian Seime <bjorncs@vespa.ai>2024-01-11 13:15:33 +0100
commitf05d25cf5426f805f1baa38bb58ec7fb048effb0 (patch)
treeeb067ba5c22e8666bc103e1fe84f42cb8da02377 /container-core
parent57fedea84830b9c11692f169557a039609dfb4e0 (diff)
Don't fill async rendering executor queue with tasks that blocks and prefer continue rendering in same thread
Diffstat (limited to 'container-core')
-rw-r--r--container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java63
1 files changed, 46 insertions, 17 deletions
diff --git a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
index 1d019078202..50140199eda 100644
--- a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
+++ b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java
@@ -17,7 +17,9 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -334,6 +336,10 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
/** The listener to the parent of this list, or null if this is the root */
private final DataListListener parent;
+ /** Queue of rendering tasks that can be executed immediately without dispatching to executor and incuring a context switch */
+ private final Queue<Runnable> syncTasks = new LinkedList<>();
+
+
public DataListListener(DataList list, DataListListener parent) {
this.list = list;
this.parent = parent;
@@ -341,19 +347,35 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
@Override
protected void render() throws IOException, InterruptedException, ExecutionException {
- if (dataListListenerStack.peekFirst() != this)
- return; // This listens to some ancestor of the current list, do this later
- if (beforeHandoverMode && ! list.isFrozen())
- return; // Called on completion of a list which is not frozen yet - hold off until frozen
-
- if ( ! beforeHandoverMode)
- list.completeFuture().get(); // trigger completion if not done already to invoke any listeners on that event
- boolean startedRendering = renderData();
- if ( ! startedRendering || uncompletedChildren > 0) return; // children must render to completion first
- if (list.completeFuture().isDone()) // might not be when in before handover mode
- endListLevel();
- else
- stream.flush();
+ try {
+ if (dataListListenerStack.peekFirst() != this)
+ return; // This listens to some ancestor of the current list, do this later
+ if (beforeHandoverMode && ! list.isFrozen())
+ return; // Called on completion of a list which is not frozen yet - hold off until frozen
+
+ if ( ! beforeHandoverMode)
+ // Trigger completion if not done already to invoke any listeners on that event.
+ // Note that the completable future might have its get() method overridden.
+ // See DrainOnGetFuture.get() and friends for details.
+ list.completeFuture().get();
+ boolean startedRendering = renderData();
+ if ( ! startedRendering || uncompletedChildren > 0) return; // children must render to completion first
+ if (list.completeFuture().isDone()) // might not be when in before handover mode
+ endListLevel();
+ else
+ stream.flush();
+ } finally {
+ RuntimeException exception = null;
+ while (!syncTasks.isEmpty()) {
+ try {
+ syncTasks.poll().run();
+ } catch (RuntimeException e) {
+ if (exception == null) exception = e;
+ else exception.addSuppressed(e);
+ }
+ }
+ if (exception != null) throw exception;
+ }
}
private void endListLevel() throws IOException {
@@ -426,7 +448,7 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
}
}
- private void listenTo(DataList subList, boolean listenToNewDataAdded) throws IOException {
+ private void listenTo(DataList<?> subList, boolean listenToNewDataAdded) throws IOException {
DataListListener listListener = new DataListListener(subList,this);
dataListListenerStack.addFirst(listListener);
@@ -435,9 +457,16 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e
flushIfLikelyToSuspend(subList);
- subList.addFreezeListener(listListener, getExecutor());
- subList.completeFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
- subList.incoming().completedFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
+ // Execute listener in the same thread if possible to avoid context switch
+
+ if (subList.isFrozen()) syncTasks.add(listListener);
+ else subList.addFreezeListener(listListener, getExecutor());
+
+ if (subList.completeFuture().isDone()) syncTasks.add(listListener);
+ else subList.completeFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
+
+ if (subList.incoming().completedFuture().isDone()) syncTasks.add(listListener);
+ else subList.incoming().completedFuture().whenCompleteAsync((__, ___) -> listListener.run(), getExecutor());
}
private boolean isOrdered(DataList dataList) {