diff options
author | Jon Bratseth <bratseth@gmail.com> | 2021-10-08 13:52:48 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2021-10-08 13:52:48 +0200 |
commit | 475f0c93523daa1c3c53786ee8a9a9aee6702204 (patch) | |
tree | d20932871aa4c917052f2e37dde77c5d4a880e39 /container-search/src/main/java/com | |
parent | 9d435e48c742b3efc94c68c02da835c5f5298255 (diff) |
Use the executor from the context
Diffstat (limited to 'container-search/src/main/java/com')
6 files changed, 23 insertions, 70 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index e541cc8b5c0..c28e18a37d5 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -30,7 +30,6 @@ import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; - /** * Superclass for backend searchers. * diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java index 4b9d469d90d..98c3ac9e3ac 100644 --- a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java @@ -90,37 +90,28 @@ public class FederationSearcher extends ForkingSearcher { private final boolean strictSearchchain; private final TargetSelector<?> targetSelector; private final Clock clock = Clock.systemUTC(); - private final Executor executor; - - @Inject public FederationSearcher(FederationConfig config, StrictContractsConfig strict, - ComponentRegistry<TargetSelector> targetSelectors, Executor executor) { + ComponentRegistry<TargetSelector> targetSelectors) { this(createResolver(config), strict.searchchains(), strict.propagateSourceProperties(), - resolveSelector(config.targetSelector(), targetSelectors), executor); + resolveSelector(config.targetSelector(), targetSelectors)); } // for testing - FederationSearcher(ComponentId id, SearchChainResolver searchChainResolver, Executor executor) { - this(searchChainResolver, false, PropagateSourceProperties.EVERY, null, executor); - } - // for testing public FederationSearcher(ComponentId id, SearchChainResolver searchChainResolver) { - this(id, searchChainResolver, new InThreadExecutorService()); + this(searchChainResolver, false, PropagateSourceProperties.EVERY, null); } private FederationSearcher(SearchChainResolver searchChainResolver, boolean strictSearchchain, PropagateSourceProperties.Enum propagateSourceProperties, - TargetSelector targetSelector, - Executor executor) { + TargetSelector targetSelector) { this.searchChainResolver = searchChainResolver; sourceRefResolver = new SourceRefResolver(searchChainResolver); this.strictSearchchain = strictSearchchain; this.propagateSourceProperties = propagateSourceProperties; this.targetSelector = targetSelector; - this.executor = executor; } private static TargetSelector resolveSelector(String selectorId, @@ -255,7 +246,7 @@ public class FederationSearcher extends ForkingSearcher { if (timeout <= 0) return new FutureResult(() -> new Result(query, ErrorMessage.createTimeout("Timed out before federation")), execution, query); Query clonedQuery = cloneFederationQuery(query, window, timeout, target); - return new AsyncExecution(target.getChain(), execution).search(clonedQuery, executor); + return new AsyncExecution(target.getChain(), execution).search(clonedQuery); } private Query cloneFederationQuery(Query query, Window window, long timeout, Target target) { @@ -446,7 +437,7 @@ public class FederationSearcher extends ForkingSearcher { propagateErrors(resultToFill, result); } else { AsyncExecution asyncFill = new AsyncExecution(chainExecution); - futureFilledResults.add(new Pair<>(resultToFill, asyncFill.fill(resultToFill, summaryClass, executor))); + futureFilledResults.add(new Pair<>(resultToFill, asyncFill.fill(resultToFill, summaryClass))); } } } diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java index 1b2eeb5efe6..ac879183125 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java @@ -53,31 +53,6 @@ import java.util.concurrent.atomic.AtomicReference; * @author Arne Bergene Fossaa */ public class AsyncExecution { - private static final AtomicReference<Executor> deprecatedExecutor = new AtomicReference<>(null); - - private static Executor createDeprecatedExecutor() { - int numCpus = Runtime.getRuntime().availableProcessors(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(2*numCpus, numCpus*10, 1L, TimeUnit.SECONDS, - new SynchronousQueue<>(false), ThreadFactoryFactory.getThreadFactory("search")); - // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also - // get the dreaded thread locals initialized even if they will never run. - // That counters what we we want to achieve with the Q that will prefer thread locality. - executor.prestartAllCoreThreads(); - return executor; - } - private static Executor getDeprecatedExecutor() { - Executor executor = deprecatedExecutor.get(); - if (executor == null) { - synchronized (deprecatedExecutor) { - executor = deprecatedExecutor.get(); - if (executor == null) { - executor = createDeprecatedExecutor(); - deprecatedExecutor.set(executor); - } - } - } - return executor; - } /** The execution this executes */ private final Execution execution; @@ -138,41 +113,29 @@ public class AsyncExecution { * * @see com.yahoo.search.searchchain.Execution */ - public FutureResult search(Query query, Executor executor) { - return getFutureResult(executor, () -> execution.search(query), query); - } - @Deprecated public FutureResult search(Query query) { - return search(query, getDeprecatedExecutor()); + return getFutureResult(execution.context().executor(), () -> execution.search(query), query); } - public FutureResult searchAndFill(Query query, Executor executor) { - return getFutureResult(executor, () -> { + public FutureResult searchAndFill(Query query) { + return getFutureResult(execution.context().executor(), () -> { Result result = execution.search(query); execution.fill(result, query.getPresentation().getSummary()); return result; }, query); } - @Deprecated - public FutureResult searchAndFill(Query query) { - return searchAndFill(query, getDeprecatedExecutor()); - } /** * The future of this functions returns the original Result * * @see com.yahoo.search.searchchain.Execution */ - public FutureResult fill(Result result, String summaryClass, Executor executor) { - return getFutureResult(executor, () -> { + public FutureResult fill(Result result, String summaryClass) { + return getFutureResult(execution.context().executor(), () -> { execution.fill(result, summaryClass); return result; }, result.getQuery()); } - @Deprecated - public FutureResult fill(Result result, String summaryClass) { - return fill(result, summaryClass, getDeprecatedExecutor()); - } private static <T> Future<T> getFuture(Executor executor, Callable<T> callable) { FutureTask<T> future = new FutureTask<>(callable); @@ -206,14 +169,15 @@ public class AsyncExecution { * done when the timeout expires, it will be cancelled, and it will return a * result. All unfinished Futures will be cancelled. * - * @return the list of results in the same order as returned from the task - * collection + * @return the list of results in the same order as returned from the task collection */ - public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs, Executor executor) { + public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs) { + if (tasks.isEmpty()) return new ArrayList<>(); + // Copy the list in case it is modified while we are waiting List<FutureResult> workingTasks = new ArrayList<>(tasks); try { - runTask(executor, () -> { + runTask(tasks.stream().findAny().get().getExecution().context().executor(), () -> { for (FutureResult task : workingTasks) task.get(timeoutMs, TimeUnit.MILLISECONDS); }).get(timeoutMs, TimeUnit.MILLISECONDS); @@ -233,9 +197,5 @@ public class AsyncExecution { } return results; } - @Deprecated - public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs) { - return waitForAll(tasks, timeoutMs, getDeprecatedExecutor()); - } } diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java b/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java index 86ba8664dee..fc2d44e01d9 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java @@ -18,6 +18,7 @@ import com.yahoo.search.rendering.RendererRegistry; import com.yahoo.search.statistics.TimeTracker; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; /** * <p>An execution of a search chain. This keeps track of the call state for an execution (in the calling thread) @@ -122,7 +123,7 @@ public class Execution extends com.yahoo.processing.execution.Execution { this.tokenRegistry = tokenRegistry; this.rendererRegistry = rendererRegistry; this.linguistics = linguistics; - this.executor = executor != null ? executor : Runnable::run; // Run in same thread if no executor is provided + this.executor = executor != null ? executor : Executors.newSingleThreadExecutor(); } /** @deprecated pass an executor */ @@ -362,7 +363,7 @@ public class Execution extends com.yahoo.processing.execution.Execution { /** * Returns the executor that should be used to execute tasks as part of this execution. - * This is never null but will be an executor that runs in the same thread if none is passed to this. + * This is never null but will be an executor that runs a single thread if none is passed to this. */ public Executor executor() { return executor; } diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java b/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java index 210a77ccf57..f6977849a02 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java @@ -66,7 +66,7 @@ public class ExecutionFactory extends AbstractComponent { this.specialTokens = new SpecialTokenRegistry(specialTokens); this.linguistics = linguistics; this.rendererRegistry = new RendererRegistry(renderers.allComponents()); - this.executor = executor != null ? executor : Runnable::run; + this.executor = executor != null ? executor : Executors.newSingleThreadExecutor(); } /** @deprecated pass the container threadpool */ diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java index 81d98f828fd..64bbcb4780c 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java @@ -24,7 +24,6 @@ public class FutureResult extends FutureTask<Result> { private final Query query; - /** Only used for generating messages */ private final Execution execution; private final static Logger log = Logger.getLogger(FutureResult.class.getName()); @@ -90,6 +89,9 @@ public class FutureResult extends FutureTask<Result> { return query; } + /** Returns the execution which creates this */ + public Execution getExecution() { return execution; } + private ErrorMessage createInterruptedError(Exception e) { return ErrorMessage.createUnspecifiedError(execution + " was interrupted while executing: " + Exceptions.toMessageString(e)); |