aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2021-10-08 13:52:48 +0200
committerJon Bratseth <bratseth@gmail.com>2021-10-08 13:52:48 +0200
commit475f0c93523daa1c3c53786ee8a9a9aee6702204 (patch)
treed20932871aa4c917052f2e37dde77c5d4a880e39 /container-search/src/main/java/com
parent9d435e48c742b3efc94c68c02da835c5f5298255 (diff)
Use the executor from the context
Diffstat (limited to 'container-search/src/main/java/com')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java21
-rw-r--r--container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java60
-rw-r--r--container-search/src/main/java/com/yahoo/search/searchchain/Execution.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java4
6 files changed, 23 insertions, 70 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
index e541cc8b5c0..c28e18a37d5 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
@@ -30,7 +30,6 @@ import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
-
/**
* Superclass for backend searchers.
*
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
index 4b9d469d90d..98c3ac9e3ac 100644
--- a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
@@ -90,37 +90,28 @@ public class FederationSearcher extends ForkingSearcher {
private final boolean strictSearchchain;
private final TargetSelector<?> targetSelector;
private final Clock clock = Clock.systemUTC();
- private final Executor executor;
-
-
@Inject
public FederationSearcher(FederationConfig config, StrictContractsConfig strict,
- ComponentRegistry<TargetSelector> targetSelectors, Executor executor) {
+ ComponentRegistry<TargetSelector> targetSelectors) {
this(createResolver(config), strict.searchchains(), strict.propagateSourceProperties(),
- resolveSelector(config.targetSelector(), targetSelectors), executor);
+ resolveSelector(config.targetSelector(), targetSelectors));
}
// for testing
- FederationSearcher(ComponentId id, SearchChainResolver searchChainResolver, Executor executor) {
- this(searchChainResolver, false, PropagateSourceProperties.EVERY, null, executor);
- }
- // for testing
public FederationSearcher(ComponentId id, SearchChainResolver searchChainResolver) {
- this(id, searchChainResolver, new InThreadExecutorService());
+ this(searchChainResolver, false, PropagateSourceProperties.EVERY, null);
}
private FederationSearcher(SearchChainResolver searchChainResolver,
boolean strictSearchchain,
PropagateSourceProperties.Enum propagateSourceProperties,
- TargetSelector targetSelector,
- Executor executor) {
+ TargetSelector targetSelector) {
this.searchChainResolver = searchChainResolver;
sourceRefResolver = new SourceRefResolver(searchChainResolver);
this.strictSearchchain = strictSearchchain;
this.propagateSourceProperties = propagateSourceProperties;
this.targetSelector = targetSelector;
- this.executor = executor;
}
private static TargetSelector resolveSelector(String selectorId,
@@ -255,7 +246,7 @@ public class FederationSearcher extends ForkingSearcher {
if (timeout <= 0)
return new FutureResult(() -> new Result(query, ErrorMessage.createTimeout("Timed out before federation")), execution, query);
Query clonedQuery = cloneFederationQuery(query, window, timeout, target);
- return new AsyncExecution(target.getChain(), execution).search(clonedQuery, executor);
+ return new AsyncExecution(target.getChain(), execution).search(clonedQuery);
}
private Query cloneFederationQuery(Query query, Window window, long timeout, Target target) {
@@ -446,7 +437,7 @@ public class FederationSearcher extends ForkingSearcher {
propagateErrors(resultToFill, result);
} else {
AsyncExecution asyncFill = new AsyncExecution(chainExecution);
- futureFilledResults.add(new Pair<>(resultToFill, asyncFill.fill(resultToFill, summaryClass, executor)));
+ futureFilledResults.add(new Pair<>(resultToFill, asyncFill.fill(resultToFill, summaryClass)));
}
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java
index 1b2eeb5efe6..ac879183125 100644
--- a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java
+++ b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java
@@ -53,31 +53,6 @@ import java.util.concurrent.atomic.AtomicReference;
* @author Arne Bergene Fossaa
*/
public class AsyncExecution {
- private static final AtomicReference<Executor> deprecatedExecutor = new AtomicReference<>(null);
-
- private static Executor createDeprecatedExecutor() {
- int numCpus = Runtime.getRuntime().availableProcessors();
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2*numCpus, numCpus*10, 1L, TimeUnit.SECONDS,
- new SynchronousQueue<>(false), ThreadFactoryFactory.getThreadFactory("search"));
- // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also
- // get the dreaded thread locals initialized even if they will never run.
- // That counters what we we want to achieve with the Q that will prefer thread locality.
- executor.prestartAllCoreThreads();
- return executor;
- }
- private static Executor getDeprecatedExecutor() {
- Executor executor = deprecatedExecutor.get();
- if (executor == null) {
- synchronized (deprecatedExecutor) {
- executor = deprecatedExecutor.get();
- if (executor == null) {
- executor = createDeprecatedExecutor();
- deprecatedExecutor.set(executor);
- }
- }
- }
- return executor;
- }
/** The execution this executes */
private final Execution execution;
@@ -138,41 +113,29 @@ public class AsyncExecution {
*
* @see com.yahoo.search.searchchain.Execution
*/
- public FutureResult search(Query query, Executor executor) {
- return getFutureResult(executor, () -> execution.search(query), query);
- }
- @Deprecated
public FutureResult search(Query query) {
- return search(query, getDeprecatedExecutor());
+ return getFutureResult(execution.context().executor(), () -> execution.search(query), query);
}
- public FutureResult searchAndFill(Query query, Executor executor) {
- return getFutureResult(executor, () -> {
+ public FutureResult searchAndFill(Query query) {
+ return getFutureResult(execution.context().executor(), () -> {
Result result = execution.search(query);
execution.fill(result, query.getPresentation().getSummary());
return result;
}, query);
}
- @Deprecated
- public FutureResult searchAndFill(Query query) {
- return searchAndFill(query, getDeprecatedExecutor());
- }
/**
* The future of this functions returns the original Result
*
* @see com.yahoo.search.searchchain.Execution
*/
- public FutureResult fill(Result result, String summaryClass, Executor executor) {
- return getFutureResult(executor, () -> {
+ public FutureResult fill(Result result, String summaryClass) {
+ return getFutureResult(execution.context().executor(), () -> {
execution.fill(result, summaryClass);
return result;
}, result.getQuery());
}
- @Deprecated
- public FutureResult fill(Result result, String summaryClass) {
- return fill(result, summaryClass, getDeprecatedExecutor());
- }
private static <T> Future<T> getFuture(Executor executor, Callable<T> callable) {
FutureTask<T> future = new FutureTask<>(callable);
@@ -206,14 +169,15 @@ public class AsyncExecution {
* done when the timeout expires, it will be cancelled, and it will return a
* result. All unfinished Futures will be cancelled.
*
- * @return the list of results in the same order as returned from the task
- * collection
+ * @return the list of results in the same order as returned from the task collection
*/
- public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs, Executor executor) {
+ public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs) {
+ if (tasks.isEmpty()) return new ArrayList<>();
+
// Copy the list in case it is modified while we are waiting
List<FutureResult> workingTasks = new ArrayList<>(tasks);
try {
- runTask(executor, () -> {
+ runTask(tasks.stream().findAny().get().getExecution().context().executor(), () -> {
for (FutureResult task : workingTasks)
task.get(timeoutMs, TimeUnit.MILLISECONDS);
}).get(timeoutMs, TimeUnit.MILLISECONDS);
@@ -233,9 +197,5 @@ public class AsyncExecution {
}
return results;
}
- @Deprecated
- public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs) {
- return waitForAll(tasks, timeoutMs, getDeprecatedExecutor());
- }
}
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java b/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java
index 86ba8664dee..fc2d44e01d9 100644
--- a/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java
+++ b/container-search/src/main/java/com/yahoo/search/searchchain/Execution.java
@@ -18,6 +18,7 @@ import com.yahoo.search.rendering.RendererRegistry;
import com.yahoo.search.statistics.TimeTracker;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
/**
* <p>An execution of a search chain. This keeps track of the call state for an execution (in the calling thread)
@@ -122,7 +123,7 @@ public class Execution extends com.yahoo.processing.execution.Execution {
this.tokenRegistry = tokenRegistry;
this.rendererRegistry = rendererRegistry;
this.linguistics = linguistics;
- this.executor = executor != null ? executor : Runnable::run; // Run in same thread if no executor is provided
+ this.executor = executor != null ? executor : Executors.newSingleThreadExecutor();
}
/** @deprecated pass an executor */
@@ -362,7 +363,7 @@ public class Execution extends com.yahoo.processing.execution.Execution {
/**
* Returns the executor that should be used to execute tasks as part of this execution.
- * This is never null but will be an executor that runs in the same thread if none is passed to this.
+ * This is never null but will be an executor that runs a single thread if none is passed to this.
*/
public Executor executor() { return executor; }
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java b/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java
index 210a77ccf57..f6977849a02 100644
--- a/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/searchchain/ExecutionFactory.java
@@ -66,7 +66,7 @@ public class ExecutionFactory extends AbstractComponent {
this.specialTokens = new SpecialTokenRegistry(specialTokens);
this.linguistics = linguistics;
this.rendererRegistry = new RendererRegistry(renderers.allComponents());
- this.executor = executor != null ? executor : Runnable::run;
+ this.executor = executor != null ? executor : Executors.newSingleThreadExecutor();
}
/** @deprecated pass the container threadpool */
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java
index 81d98f828fd..64bbcb4780c 100644
--- a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java
+++ b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java
@@ -24,7 +24,6 @@ public class FutureResult extends FutureTask<Result> {
private final Query query;
- /** Only used for generating messages */
private final Execution execution;
private final static Logger log = Logger.getLogger(FutureResult.class.getName());
@@ -90,6 +89,9 @@ public class FutureResult extends FutureTask<Result> {
return query;
}
+ /** Returns the execution which creates this */
+ public Execution getExecution() { return execution; }
+
private ErrorMessage createInterruptedError(Exception e) {
return ErrorMessage.createUnspecifiedError(execution + " was interrupted while executing: " +
Exceptions.toMessageString(e));