diff options
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java | 39 |
1 files changed, 33 insertions, 6 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java index c1d8972bae4..f6d95e01ed8 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java @@ -2,7 +2,7 @@ package com.yahoo.search.searchchain; import com.yahoo.component.chain.Chain; -import com.yahoo.concurrent.InThreadExecutorService; +import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.Searcher; @@ -16,8 +16,11 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** * Provides asynchronous execution of searchchains. @@ -49,8 +52,32 @@ import java.util.concurrent.TimeoutException; * @see com.yahoo.search.searchchain.Execution * @author Arne Bergene Fossaa */ -//TODO Make package private as this is intended for use with FederationSearcher only. public class AsyncExecution { + private static final AtomicReference<Executor> deprecatedExecutor = new AtomicReference<>(null); + + private static Executor createDeprecatedExecutor() { + int numCpus = Runtime.getRuntime().availableProcessors(); + ThreadPoolExecutor executor = new ThreadPoolExecutor(2*numCpus, numCpus*10, 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), ThreadFactoryFactory.getThreadFactory("search")); + // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also + // get the dreaded thread locals initialized even if they will never run. + // That counters what we we want to achieve with the Q that will prefer thread locality. + executor.prestartAllCoreThreads(); + return executor; + } + private static Executor getDeprecatedExecutor() { + Executor executor = deprecatedExecutor.get(); + if (executor == null) { + synchronized (deprecatedExecutor) { + executor = deprecatedExecutor.get(); + if (executor == null) { + executor = createDeprecatedExecutor(); + deprecatedExecutor.set(executor); + } + } + } + return executor; + } /** The execution this executes */ private final Execution execution; @@ -116,7 +143,7 @@ public class AsyncExecution { } @Deprecated public FutureResult search(Query query) { - return search(query, new InThreadExecutorService()); + return search(query, getDeprecatedExecutor()); } public FutureResult searchAndFill(Query query, Executor executor) { @@ -128,7 +155,7 @@ public class AsyncExecution { } @Deprecated public FutureResult searchAndFill(Query query) { - return searchAndFill(query, new InThreadExecutorService()); + return searchAndFill(query, getDeprecatedExecutor()); } /** @@ -144,7 +171,7 @@ public class AsyncExecution { } @Deprecated public FutureResult fill(Result result, String summaryClass) { - return fill(result, summaryClass, new InThreadExecutorService()); + return fill(result, summaryClass, getDeprecatedExecutor()); } private static <T> Future<T> getFuture(Executor executor, Callable<T> callable) { @@ -208,7 +235,7 @@ public class AsyncExecution { } @Deprecated public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs) { - return waitForAll(tasks, timeoutMs, new InThreadExecutorService()); + return waitForAll(tasks, timeoutMs, getDeprecatedExecutor()); } } |