summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2018-11-23 15:20:13 +0100
committerOlli Virtanen <olli.virtanen@oath.com>2018-11-23 15:20:13 +0100
commit231e8c1f4996672b1c1b21a109af90c95ec455df (patch)
treec2c94c6d3d80815151ad4ce3df8ffa34fdf9c92b /container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
parent54f02674ba2edc2ddc7bf84714b6aca84f3282f9 (diff)
Adaptive timeout support in java dispatch
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java145
1 files changed, 138 insertions, 7 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index 9ff43df87cf..83647b332e6 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -5,12 +5,25 @@ import com.yahoo.fs4.QueryPacket;
import com.yahoo.prelude.fastsearch.CacheKey;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.vespa.config.search.DispatchConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT;
+import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
/**
* InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content
@@ -19,11 +32,25 @@ import java.util.Map;
*
* @author ollivir
*/
-public class InterleavedSearchInvoker extends SearchInvoker {
- private final Collection<SearchInvoker> invokers;
+public class InterleavedSearchInvoker extends SearchInvoker implements ResponseMonitor<SearchInvoker> {
+ private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName());
+
+ private final Set<SearchInvoker> invokers;
+ private final SearchCluster searchCluster;
+ private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
+ private Query query;
+
+ private boolean adaptiveTimeoutCalculated = false;
+ private long adaptiveTimeoutMin = 0;
+ private long adaptiveTimeoutMax = 0;
+ private long deadline = 0;
- public InterleavedSearchInvoker(Map<Integer, SearchInvoker> invokers) {
- this.invokers = new ArrayList<>(invokers.values());
+ public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, SearchCluster searchCluster) {
+ super(Optional.empty());
+ this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
+ this.invokers.addAll(invokers);
+ this.searchCluster = searchCluster;
+ this.availableForProcessing = newQueue();
}
/**
@@ -33,27 +60,109 @@ public class InterleavedSearchInvoker extends SearchInvoker {
*/
@Override
protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
+ this.query = query;
+ invokers.forEach(invoker -> invoker.setMonitor(this));
+ deadline = currentTime() + query.getTimeLeft();
+
int originalHits = query.getHits();
int originalOffset = query.getOffset();
query.setHits(query.getHits() + query.getOffset());
query.setOffset(0);
+
for (SearchInvoker invoker : invokers) {
invoker.sendSearchRequest(query, null);
}
+
query.setHits(originalHits);
query.setOffset(originalOffset);
}
@Override
protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException {
+ int requests = invokers.size();
+ int responses = 0;
List<Result> results = new ArrayList<>();
- for (SearchInvoker invoker : invokers) {
- results.addAll(invoker.getSearchResults(cacheKey));
+ long nextTimeout = query.getTimeLeft();
+ try {
+ while (!invokers.isEmpty() && nextTimeout >= 0) {
+ SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS);
+ if (invoker == null) {
+ if (log.isLoggable(Level.FINE)) {
+ log.fine("Search timed out with " + requests + " requests made, " + responses + " responses received");
+ }
+ break;
+ } else {
+ invokers.remove(invoker);
+ results.addAll(invoker.getSearchResults(cacheKey));
+ responses++;
+ }
+ nextTimeout = nextTimeout(requests, responses);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for search results", e);
}
+
+ insertTimeoutErrors(results);
return results;
}
+ private void insertTimeoutErrors(List<Result> results) {
+ int degradedReason = adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT;
+
+ for (SearchInvoker invoker : invokers) {
+ Optional<Integer> dk = invoker.distributionKey();
+ String message;
+ if (dk.isPresent()) {
+ message = "Backend communication timeout on node with distribution-key " + dk.get();
+ } else {
+ message = "Backend communication timeout";
+ }
+ Result error = new Result(query, ErrorMessage.createBackendCommunicationError(message));
+ invoker.getErrorCoverage().ifPresent(coverage -> {
+ coverage.setDegradedReason(degradedReason);
+ error.setCoverage(coverage);
+ });
+ results.add(error);
+ }
+ }
+
+ private long nextTimeout(int requests, int responses) {
+ DispatchConfig config = searchCluster.dispatchConfig();
+ double minimumCoverage = config.minSearchCoverage();
+
+ if (requests == responses || minimumCoverage >= 100.0) {
+ return query.getTimeLeft();
+ }
+ int minimumResponses = (int) (requests * minimumCoverage / 100.0);
+
+ if (responses < minimumResponses) {
+ return query.getTimeLeft();
+ }
+
+ long timeLeft = query.getTimeLeft();
+ if (!adaptiveTimeoutCalculated) {
+ adaptiveTimeoutMin = (long) (timeLeft * config.minWaitAfterCoverageFactor());
+ adaptiveTimeoutMax = (long) (timeLeft * config.maxWaitAfterCoverageFactor());
+ adaptiveTimeoutCalculated = true;
+ }
+
+ long now = currentTime();
+ int pendingQueries = requests - responses;
+ double missWidth = ((100.0 - config.minSearchCoverage()) * requests) / 100.0 - 1.0;
+ double slopedWait = adaptiveTimeoutMin;
+ if (pendingQueries > 1 && missWidth > 0.0) {
+ slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth;
+ }
+ long nextAdaptive = (long) slopedWait;
+ if (now + nextAdaptive >= deadline) {
+ return deadline - now;
+ }
+ deadline = now + nextAdaptive;
+
+ return nextAdaptive;
+ }
+
@Override
protected void release() {
if (!invokers.isEmpty()) {
@@ -61,4 +170,26 @@ public class InterleavedSearchInvoker extends SearchInvoker {
invokers.clear();
}
}
+
+ @Override
+ public void responseAvailable(SearchInvoker from) {
+ if (availableForProcessing != null) {
+ availableForProcessing.add(from);
+ }
+ }
+
+ @Override
+ protected void setMonitor(ResponseMonitor<SearchInvoker> monitor) {
+ // never to be called
+ }
+
+ // For overriding in tests
+ protected long currentTime() {
+ return System.currentTimeMillis();
+ }
+
+ // For overriding in tests
+ protected LinkedBlockingQueue<SearchInvoker> newQueue() {
+ return new LinkedBlockingQueue<>();
+ }
}