diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-06-13 18:14:55 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-13 18:14:55 +0200 |
commit | 1b4e1acb733d494d9f7a4a0450f41806ceb14184 (patch) | |
tree | f359651a9e9d1231bdacc9e0fc1f4f1a489e790f | |
parent | 6f9a74cdc71e36130c907f706293f3f3e33615f8 (diff) | |
parent | bdfab3d8f9aa0e913b0d5300a2053eb7f0884823 (diff) |
Merge pull request #27410 from vespa-engine/bratseth/bound-rendering-tasks
Bratseth/bound rendering tasks
10 files changed, 72 insertions, 35 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java index d5dc18c1e5e..037506bed7f 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ContentChannelOutputStream.java @@ -84,7 +84,7 @@ public class ContentChannelOutputStream extends OutputStream implements Writable */ @Override public void write(byte[] b) throws IOException { - nonCopyingWrite(Arrays.copyOf(b, b.length)); + nonCopyingWrite(Arrays.copyOf(b, b.length)); } /** @@ -142,10 +142,13 @@ public class ContentChannelOutputStream extends OutputStream implements Writable } private class LoggingCompletionHandler implements CompletionHandler { + private final CompletionHandler nested; + LoggingCompletionHandler(CompletionHandler nested) { this.nested = nested; } + @Override public void completed() { if (nested != null) { diff --git a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java index 2749b73272d..bac514266cb 100644 --- a/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java +++ b/container-core/src/main/java/com/yahoo/processing/rendering/AsynchronousSectionedRenderer.java @@ -16,7 +16,6 @@ import com.yahoo.processing.response.Streamed; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayDeque; -import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -143,7 +142,7 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e } /** - * Create an renderer using the specified executor instead of the default one which should be used for production. + * Create a renderer using the specified executor instead of the default one which should be used for production. * Using a custom executor is useful for tests to avoid creating new threads for each renderer registry. * * @param executor the executor to use or null to use the default executor suitable for production @@ -226,7 +225,7 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e this.response = response; this.stream = stream; this.execution = execution; - DataListListener parentOfTopLevelListener = new DataListListener(new ParentOfTopLevel(request,response.data()), null); + DataListListener parentOfTopLevelListener = new DataListListener(new ParentOfTopLevel(request, response.data()), null); dataListListenerStack.addFirst(parentOfTopLevelListener); success = new CompletableFuture<>(); try { @@ -264,13 +263,13 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e /** * How deep into the tree of nested data lists the callback currently is. - * beginList() is invoked after this this is increased, and endList() is + * beginList() is invoked after this is increased, and endList() is * invoked before it is decreased. * * @return an integer of 1 or above */ public int getRecursionLevel() { - return dataListListenerStack.size()-1; + return dataListListenerStack.size() - 1; } /** @@ -406,7 +405,7 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e /** Renders a list. */ private void renderDataList(DataList list) throws IOException { - final boolean ordered = isOrdered(list); + boolean ordered = isOrdered(list); if (list.asList() == null) { logger.log(Level.WARNING, "DataList.asList() returned null, indicating it is closed. " + "This is likely caused by adding the same list multiple " + @@ -565,9 +564,9 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e */ private static class ParentOfTopLevel extends AbstractDataList { - private DataList trueTopLevel; + private final DataList trueTopLevel; - public ParentOfTopLevel(Request request,DataList trueTopLevel) { + public ParentOfTopLevel(Request request, DataList trueTopLevel) { super(request); this.trueTopLevel = trueTopLevel; freeze(); @@ -585,13 +584,13 @@ public abstract class AsynchronousSectionedRenderer<RESPONSE extends Response> e @Override public Data get(int index) { - if (index>0) throw new IndexOutOfBoundsException(); + if (index > 0) throw new IndexOutOfBoundsException(); return trueTopLevel; } @Override public List<Data> asList() { - return Collections.<Data>singletonList(trueTopLevel); + return List.of(trueTopLevel); } @Override diff --git a/container-core/src/main/java/com/yahoo/processing/rendering/Renderer.java b/container-core/src/main/java/com/yahoo/processing/rendering/Renderer.java index df53ac846f2..97ca0bfb6a4 100644 --- a/container-core/src/main/java/com/yahoo/processing/rendering/Renderer.java +++ b/container-core/src/main/java/com/yahoo/processing/rendering/Renderer.java @@ -53,7 +53,7 @@ public abstract class Renderer<RESPONSE extends Response> extends AbstractCompon * @return a {@link CompletableFuture} containing a boolean where true indicates a successful rendering */ public abstract CompletableFuture<Boolean> renderResponse(OutputStream stream, RESPONSE response, - Execution execution, Request request); + Execution execution, Request request); /** * Name of the output encoding, if applicable. diff --git a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java index fc0e6d21e13..358f8ad9693 100644 --- a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java +++ b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java @@ -100,7 +100,7 @@ public interface IncomingData<DATATYPE extends Data> { */ final class NullIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> { - private DataList<DATATYPE> owner; + private final DataList<DATATYPE> owner; private final ImmediateFuture<DATATYPE> completionFuture; public NullIncomingData(DataList<DATATYPE> owner) { diff --git a/container-search/src/main/java/com/yahoo/search/handler/HttpSearchResponse.java b/container-search/src/main/java/com/yahoo/search/handler/HttpSearchResponse.java index c6906ea7566..bc730f18986 100644 --- a/container-search/src/main/java/com/yahoo/search/handler/HttpSearchResponse.java +++ b/container-search/src/main/java/com/yahoo/search/handler/HttpSearchResponse.java @@ -91,8 +91,7 @@ public class HttpSearchResponse extends ExtendedResponse { @Override public void render(OutputStream output, ContentChannel networkChannel, CompletionHandler handler) throws IOException { - if (rendererCopy instanceof AsynchronousSectionedRenderer) { - AsynchronousSectionedRenderer<Result> renderer = (AsynchronousSectionedRenderer<Result>) rendererCopy; + if (rendererCopy instanceof AsynchronousSectionedRenderer<Result> renderer) { renderer.setNetworkWiring(networkChannel, handler); } try { diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java b/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java index 87880ce2445..dc044c77c94 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java @@ -86,7 +86,7 @@ public class ExecutionFactory extends AbstractComponent { this.schemaInfo = schemaInfo; this.specialTokens = new SpecialTokenRegistry(specialTokens); this.linguistics = linguistics; - this.renderingExecutor = createRenderingExecutor(); + this.renderingExecutor = new RenderingExecutorFactory().createExecutor(); this.rendererRegistry = new RendererRegistry(renderers.allComponents(), renderingExecutor); this.executor = executor != null ? executor : Executors.newSingleThreadExecutor(); } @@ -151,13 +151,4 @@ public class ExecutionFactory extends AbstractComponent { null); } - private static ThreadPoolExecutor createRenderingExecutor() { - int threadCount = Runtime.getRuntime().availableProcessors(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(threadCount, threadCount, 1L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - ThreadFactoryFactory.getThreadFactory("common-rendering")); - executor.prestartAllCoreThreads(); - return executor; - } - } diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/RenderingExecutorFactory.java b/container-search/src/main/java/com/yahoo/search/searchchain/RenderingExecutorFactory.java new file mode 100644 index 00000000000..f67db059470 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/searchchain/RenderingExecutorFactory.java @@ -0,0 +1,39 @@ +package com.yahoo.search.searchchain; + +import com.yahoo.concurrent.ThreadFactoryFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Factory of the executor passed to renderers by default. + * + * @author bratseth + */ +class RenderingExecutorFactory { + + private final int maxQueuedRenderingTasksPerProcessor; + private final int availableProcessors; + + public RenderingExecutorFactory() { + this.maxQueuedRenderingTasksPerProcessor = 100; + this.availableProcessors = Runtime.getRuntime().availableProcessors(); + } + + ThreadPoolExecutor createExecutor() { + int maxOutstandingTasks = maxQueuedRenderingTasksPerProcessor * availableProcessors; + ThreadPoolExecutor executor = new ThreadPoolExecutor(availableProcessors, availableProcessors, 1L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(maxOutstandingTasks), + ThreadFactoryFactory.getThreadFactory("common-rendering"), + (task, exec) -> renderingRejected(maxOutstandingTasks)); + executor.prestartAllCoreThreads(); + return executor; + } + + private void renderingRejected(int maxOutstandingTasks) { + throw new RejectedExecutionException("More than " + maxOutstandingTasks + " rendering tasks queued, rejecting this"); + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java index a2478046a9e..4db74cc37a0 100644 --- a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java +++ b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java @@ -11,6 +11,8 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class ThreadFactoryFactory { + static private final Map<String, PooledFactory> factory = new HashMap<>(); + static public synchronized ThreadFactory getThreadFactory(String name) { PooledFactory p = factory.get(name); if (p == null) { @@ -30,16 +32,21 @@ public class ThreadFactoryFactory { } private static class PooledFactory { + + private final String name; + private final AtomicInteger poolId = new AtomicInteger(1); + private static class Factory implements ThreadFactory { + final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; final boolean isDaemon; @SuppressWarnings("removal") - Factory(final String name, boolean isDaemon) { + Factory(String name, boolean isDaemon) { this.isDaemon = isDaemon; - final SecurityManager s = System.getSecurityManager(); + SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); @@ -47,8 +54,8 @@ public class ThreadFactoryFactory { } @Override - public Thread newThread(final Runnable r) { - final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon() != isDaemon) { t.setDaemon(isDaemon); } @@ -58,16 +65,15 @@ public class ThreadFactoryFactory { return t; } } + PooledFactory(String name) { this.name = name; } + ThreadFactory getFactory(boolean isDaemon) { return new Factory(name + "-" + poolId.getAndIncrement() + "-thread-", isDaemon); - } - private final String name; - private final AtomicInteger poolId = new AtomicInteger(1); + } - static private final Map<String, PooledFactory> factory = new HashMap<>(); } diff --git a/vespajlib/src/main/java/com/yahoo/io/BufferChain.java b/vespajlib/src/main/java/com/yahoo/io/BufferChain.java index cee7a3c25dd..8fbf13e32ba 100644 --- a/vespajlib/src/main/java/com/yahoo/io/BufferChain.java +++ b/vespajlib/src/main/java/com/yahoo/io/BufferChain.java @@ -127,7 +127,7 @@ public final class BufferChain { } public void flush() throws IOException { - for (final ByteBuffer b : buffers) { + for (ByteBuffer b : buffers) { endpoint.send(b); } buffers.clear(); diff --git a/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java b/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java index 78551b5a578..3b0c1b55af7 100644 --- a/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java +++ b/vespajlib/src/main/java/com/yahoo/io/WritableByteTransmitter.java @@ -5,7 +5,7 @@ import java.io.IOException; import java.nio.ByteBuffer; /** - * Marker interface for use with the BufferChain data store. + * For use with the BufferChain data store. * * @author Steinar Knutsen */ |