// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; import com.yahoo.vespa.config.search.DispatchConfig; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT; import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE; import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT; /** * InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content * nodes in parallel. Operationally it first sends requests to all contained invokers and then * collects the results. The user of this class is responsible for merging the results if needed. * * @author ollivir */ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseMonitor { private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName()); private final Set invokers; private final SearchCluster searchCluster; private final Group group; private final LinkedBlockingQueue availableForProcessing; private final Set alreadyFailedNodes; private Query query; private boolean adaptiveTimeoutCalculated = false; private long adaptiveTimeoutMin = 0; private long adaptiveTimeoutMax = 0; private long deadline = 0; private long answeredDocs = 0; private long answeredActiveDocs = 0; private long answeredSoonActiveDocs = 0; private int askedNodes = 0; private int answeredNodes = 0; private int answeredNodesParticipated = 0; private boolean timedOut = false; private boolean degradedByMatchPhase = false; public InterleavedSearchInvoker(Collection invokers, SearchCluster searchCluster, Group group, Set alreadyFailedNodes) { super(Optional.empty()); this.invokers = Collections.newSetFromMap(new IdentityHashMap<>()); this.invokers.addAll(invokers); this.searchCluster = searchCluster; this.group = group; this.availableForProcessing = newQueue(); this.alreadyFailedNodes = alreadyFailedNodes; } /** * Sends search queries to the contained {@link SearchInvoker} sub-invokers. If the search * query has an offset other than zero, it will be reset to zero and the expected hit amount * will be adjusted accordingly. */ @Override protected Object sendSearchRequest(Query query, Object unusedContext) throws IOException { this.query = query; invokers.forEach(invoker -> invoker.setMonitor(this)); deadline = currentTime() + query.getTimeLeft(); int originalHits = query.getHits(); int originalOffset = query.getOffset(); int neededHits = originalHits + originalOffset; int q = neededHits; if (group.isBalanced() && !group.isSparse()) { Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability); q = (topkProbabilityOverrride != null) ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride) : searchCluster.estimateHitsToFetch(neededHits, invokers.size()); } query.setHits(q); query.setOffset(0); Object context = null; for (SearchInvoker invoker : invokers) { context = invoker.sendSearchRequest(query, context); askedNodes++; } query.setHits(originalHits); query.setOffset(originalOffset); return null; } @Override protected InvokerResult getSearchResult(Execution execution) throws IOException { InvokerResult result = new InvokerResult(query, query.getHits()); List merged = Collections.emptyList(); long nextTimeout = query.getTimeLeft(); boolean extraDebug = (query.getOffset() == 0) && (query.getHits() == 7) && log.isLoggable(java.util.logging.Level.FINE); List processed = new ArrayList<>(); try { while (!invokers.isEmpty() && nextTimeout >= 0) { SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS); if (invoker == null) { log.fine(() -> "Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received"); break; } else { InvokerResult toMerge = invoker.getSearchResult(execution); if (extraDebug) { processed.add(toMerge); } merged = mergeResult(result.getResult(), toMerge, merged); ejectInvoker(invoker); } nextTimeout = nextTimeout(); } } catch (InterruptedException e) { throw new RuntimeException("Interrupted while waiting for search results", e); } insertNetworkErrors(result.getResult()); result.getResult().setCoverage(createCoverage()); if (extraDebug && merged.size() > 0) { int firstPartId = merged.get(0).getPartId(); for (int index = 1; index < merged.size(); index++) { if (merged.get(index).getPartId() != firstPartId) { extraDebug = false; log.fine("merged["+index+"/"+merged.size()+"] from partId "+merged.get(index).getPartId()+", first "+firstPartId); break; } } } if (extraDebug) { log.fine("Interleaved "+processed.size()+" results"); for (int pIdx = 0; pIdx < processed.size(); ++pIdx) { var p = processed.get(pIdx); log.fine("InvokerResult "+pIdx+" total hits "+p.getResult().getTotalHitCount()); var lean = p.getLeanHits(); for (int idx = 0; idx < lean.size(); ++idx) { var hit = lean.get(idx); log.fine("lean hit "+idx+" relevance "+hit.getRelevance()+" partid "+hit.getPartId()); } } for (int mIdx = 0; mIdx < merged.size(); ++mIdx) { var hit = merged.get(mIdx); log.fine("merged hit "+mIdx+" relevance "+hit.getRelevance()+" partid "+hit.getPartId()); } } int needed = query.getOffset() + query.getHits(); for (int index = query.getOffset(); (index < merged.size()) && (index < needed); index++) { result.getLeanHits().add(merged.get(index)); } query.setOffset(0); // Now we are all trimmed down return result; } private void insertNetworkErrors(Result result) { // Network errors will be reported as errors only when all nodes fail, otherwise they are just traced boolean asErrors = answeredNodes == 0; if (!invokers.isEmpty()) { String keys = invokers.stream().map(SearchInvoker::distributionKey).map(dk -> dk.map(i -> i.toString()).orElse("(unspecified)")) .collect(Collectors.joining(", ")); if (asErrors) { result.hits().addError(ErrorMessage .createTimeout("Backend communication timeout on all nodes in group (distribution-keys: " + keys + ")")); } else { query.trace("Backend communication timeout on nodes with distribution-keys: " + keys, 2); } timedOut = true; } if (alreadyFailedNodes != null) { var message = "Connection failure on nodes with distribution-keys: " + alreadyFailedNodes.stream().map(v -> Integer.toString(v)).collect(Collectors.joining(", ")); if (asErrors) { result.hits().addError(ErrorMessage.createBackendCommunicationError(message)); } else { query.trace(message, 2); } int failed = alreadyFailedNodes.size(); askedNodes += failed; answeredNodes += failed; } } private long nextTimeout() { DispatchConfig config = searchCluster.dispatchConfig(); double minimumCoverage = config.minSearchCoverage(); if (askedNodes == answeredNodes || minimumCoverage >= 100.0) { return query.getTimeLeft(); } int minimumResponses = (int) Math.ceil(askedNodes * minimumCoverage / 100.0); if (answeredNodes < minimumResponses) { return query.getTimeLeft(); } long timeLeft = query.getTimeLeft(); if (!adaptiveTimeoutCalculated) { adaptiveTimeoutMin = (long) (timeLeft * config.minWaitAfterCoverageFactor()); adaptiveTimeoutMax = (long) (timeLeft * config.maxWaitAfterCoverageFactor()); adaptiveTimeoutCalculated = true; } long now = currentTime(); int pendingQueries = askedNodes - answeredNodes; double missWidth = ((100.0 - config.minSearchCoverage()) * askedNodes) / 100.0 - 1.0; double slopedWait = adaptiveTimeoutMin; if (pendingQueries > 1 && missWidth > 0.0) { slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth; } long nextAdaptive = (long) slopedWait; if (now + nextAdaptive >= deadline) { return deadline - now; } deadline = now + nextAdaptive; return nextAdaptive; } private List mergeResult(Result result, InvokerResult partialResult, List current) { collectCoverage(partialResult.getResult().getCoverage(true)); result.mergeWith(partialResult.getResult()); List partialNonLean = partialResult.getResult().hits().asUnorderedHits(); for(Hit hit : partialNonLean) { if (hit.isAuxiliary()) { result.hits().add(hit); } } if (current.isEmpty() ) { return partialResult.getLeanHits(); } List partial = partialResult.getLeanHits(); if (partial.isEmpty()) { return current; } int needed = query.getOffset() + query.getHits(); List merged = new ArrayList<>(needed); int indexCurrent = 0; int indexPartial = 0; while (indexCurrent < current.size() && indexPartial < partial.size() && merged.size() < needed) { LeanHit incomingHit = partial.get(indexPartial); LeanHit currentHit = current.get(indexCurrent); int cmpRes = currentHit.compareTo(incomingHit); if (cmpRes < 0) { merged.add(currentHit); indexCurrent++; } else if (cmpRes > 0) { merged.add(incomingHit); indexPartial++; } else { // Duplicates merged.add(currentHit); indexCurrent++; indexPartial++; } } while ((indexCurrent < current.size()) && (merged.size() < needed)) { LeanHit currentHit = current.get(indexCurrent++); merged.add(currentHit); } while ((indexPartial < partial.size()) && (merged.size() < needed)) { LeanHit incomingHit = partial.get(indexPartial++); merged.add(incomingHit); } return merged; } private void collectCoverage(Coverage source) { answeredDocs += source.getDocs(); answeredActiveDocs += source.getActive(); answeredSoonActiveDocs += source.getSoonActive(); answeredNodesParticipated += source.getNodes(); answeredNodes++; degradedByMatchPhase |= source.isDegradedByMatchPhase(); timedOut |= source.isDegradedByTimeout(); } private Coverage createCoverage() { adjustDegradedCoverage(); Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1); coverage.setNodesTried(askedNodes); coverage.setSoonActive(answeredSoonActiveDocs); int degradedReason = 0; if (timedOut) { degradedReason |= (adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT); } if (degradedByMatchPhase) { degradedReason |= DEGRADED_BY_MATCH_PHASE; } coverage.setDegradedReason(degradedReason); return coverage; } private void adjustDegradedCoverage() { if (askedNodes == answeredNodesParticipated) { return; } int notAnswered = askedNodes - answeredNodesParticipated; if (adaptiveTimeoutCalculated && answeredNodesParticipated > 0) { answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated); answeredSoonActiveDocs += (notAnswered * answeredSoonActiveDocs / answeredNodesParticipated); } else { if (askedNodes > answeredNodesParticipated) { int searchableCopies = (int) searchCluster.dispatchConfig().searchableCopies(); int missingNodes = notAnswered - (searchableCopies - 1); if (answeredNodesParticipated > 0) { answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated); answeredSoonActiveDocs += (missingNodes * answeredSoonActiveDocs / answeredNodesParticipated); timedOut = true; } } } } private void ejectInvoker(SearchInvoker invoker) { invokers.remove(invoker); invoker.release(); } @Override protected void release() { if (!invokers.isEmpty()) { invokers.forEach(SearchInvoker::close); invokers.clear(); } } @Override public void responseAvailable(SearchInvoker from) { if (availableForProcessing != null) { availableForProcessing.add(from); } } @Override protected void setMonitor(ResponseMonitor monitor) { // never to be called } // For overriding in tests protected long currentTime() { return System.currentTimeMillis(); } // For overriding in tests protected LinkedBlockingQueue newQueue() { return new LinkedBlockingQueue<>(); } // For testing Collection invokers() { return invokers; } }