diff options
11 files changed, 685 insertions, 690 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java new file mode 100644 index 00000000000..a7983447a81 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java @@ -0,0 +1,134 @@ +package com.yahoo.search.federation; + +import com.google.common.collect.ImmutableList; +import com.yahoo.search.Result; +import com.yahoo.search.searchchain.FutureResult; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * The result of a federation to targets which knows how to wait for the result from each target. + * This thread handles multiple threads producing target results but only a single thread may use an instance of this. + * + * @author bratseth + */ +class FederationResult { + + /** All targets of this */ + private final List<TargetResult> targetResults; + + /** + * The remaining targets to wait for. + * Other targets are either complete, or should only be included if they are available when we complete + */ + private List<TargetResult> targetsToWaitFor; + + private FederationResult(ImmutableList<TargetResult> targetResults) { + this.targetResults = targetResults; + + if (targetResults.stream().anyMatch(TargetResult::isMandatory)) + targetsToWaitFor = targetResults.stream().filter(TargetResult::isMandatory).collect(Collectors.toList()); + else + targetsToWaitFor = new ArrayList<>(targetResults); + } + + /** + * Wait on each target for that targets timeout + * On the worst case this is the same as waiting for the max target timeout, + * in the average case it may be much better because lower timeout sources do not get to + * drive the timeout above their own timeout value. + * When this completes, results can be accessed from the TargetResults with no blocking + * (i.e getOrTimeout) without breaking any contract. + */ + public void waitForAll(int queryTimeout, Clock clock) { + long startTime = clock.millis(); + while ( ! targetsToWaitFor.isEmpty()) { + TargetResult nextToWaitFor = targetWithSmallestTimeout(targetsToWaitFor, queryTimeout); + long timeLeftOfNextTimeout = nextToWaitFor.timeout(queryTimeout) - ( clock.millis() - startTime ); + nextToWaitFor.getIfAvailable(timeLeftOfNextTimeout); + targetsToWaitFor.remove(nextToWaitFor); + } + } + + /** Returns an immutable list of the results of this */ + public List<TargetResult> all() { return targetResults; } + + private TargetResult targetWithSmallestTimeout(List<TargetResult> results, int queryTimeout) { + TargetResult smallest = null; + for (TargetResult result : results) { + if (smallest == null || result.timeout(queryTimeout) < smallest.timeout(queryTimeout)) + smallest = result; + } + return smallest; + } + + static class TargetResult { + + final FederationSearcher.Target target; + private final FutureResult futureResult; + + /** + * Single threaded access to result already returned from futureResult, if any. + * To avoid unnecessary synchronization with the producer thread. + */ + private Optional<Result> availableResult = Optional.empty(); + + private TargetResult(FederationSearcher.Target target, FutureResult futureResult) { + this.target = target; + this.futureResult = futureResult; + } + + private boolean isMandatory() { return ! target.federationOptions().getOptional(); } + + /** + * Returns the result of this by blocking until timeout if necessary. + * + * @return the result if available, or null otherwise + */ + public Optional<Result> getIfAvailable(long timeout) { + if (availableResult.isPresent()) return availableResult; + availableResult = futureResult.getIfAvailable(timeout, TimeUnit.MILLISECONDS); + return availableResult; + } + + /** Returns a result without blocking; if the result is not available one with a timeout error is produced */ + public Result getOrTimeoutError() { + // The else part is to offload creation of the timeout error + return getIfAvailable(0).orElse(futureResult.get(0, TimeUnit.MILLISECONDS)); + } + + public boolean successfullyCompleted() { + return futureResult.isDone() && ! futureResult.isCancelled(); + } + + private int timeout(long queryTimeout) { + return (int)target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout); + } + + @Override + public String toString() { + return "result for " + target; + } + + } + + public static class Builder { + + private final ImmutableList.Builder<TargetResult> results = new ImmutableList.Builder(); + + public void add(FederationSearcher.Target target, FutureResult futureResult) { + results.add(new TargetResult(target, futureResult)); + } + + public FederationResult build() { + return new FederationResult(results.build()); + } + + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java index 9c94fd04ea5..621f0fbe090 100644 --- a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java @@ -22,7 +22,6 @@ import com.yahoo.search.federation.sourceref.SearchChainResolver; import com.yahoo.search.federation.sourceref.SingleTarget; import com.yahoo.search.federation.sourceref.SourceRefResolver; import com.yahoo.search.federation.sourceref.SourcesTarget; -import com.yahoo.search.federation.sourceref.Target; import com.yahoo.search.federation.sourceref.UnresolvedSearchChainException; import com.yahoo.search.query.Properties; import com.yahoo.search.query.properties.QueryProperties; @@ -46,6 +45,7 @@ import static com.yahoo.collections.CollectionUtil.first; import static com.yahoo.container.util.Util.quote; import static com.yahoo.search.federation.StrictContractsConfig.PropagateSourceProperties; +import java.time.Clock; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; @@ -54,8 +54,9 @@ import java.util.logging.Logger; /** * This searcher takes a set of sources, looks them up in config and fire off the correct searchchains. * - * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a> + * @author Arne Bergene Fossaa * @author tonytv + * @author bratseth */ @Provides(FederationSearcher.FEDERATION) @After("*") @@ -63,195 +64,12 @@ public class FederationSearcher extends ForkingSearcher { public static final String FEDERATION = "Federation"; - /** A target for federation, containing a chain to which a federation query can be forwarded. */ - private static abstract class TargetHandler { - - abstract Chain<Searcher> getChain(); - abstract void modifyTargetQuery(Query query); - abstract void modifyTargetResult(Result result); - - ComponentId getId() { - return getChain().getId(); - } - - public abstract FederationOptions federationOptions(); - - @Override - public String toString() { return getChain().getId().stringValue(); } - - } - - /** - * A handler representing a target created by the federation logic. - * This is a value object, to ensure that identical target invocations are not invoked multiple times. - */ - private static class StandardTargetHandler extends TargetHandler { - - private final SearchChainInvocationSpec target; - private final Chain<Searcher> chain; - - public StandardTargetHandler(SearchChainInvocationSpec target, Chain<Searcher> chain) { - this.target = target; - this.chain = chain; - } - - @Override - Chain<Searcher> getChain() { return chain; } - - @Override - void modifyTargetQuery(Query query) {} - @Override - void modifyTargetResult(Result result) {} - - @Override - public FederationOptions federationOptions() { return target.federationOptions; } - - @Override - public boolean equals(Object o) { - if (o == this) return true; - if ( ! ( o instanceof StandardTargetHandler)) return false; - - StandardTargetHandler other = (StandardTargetHandler)o; - if ( ! Objects.equals(other.chain.getId(), this.chain.getId())) return false; - if ( ! Objects.equals(other.target, this.target)) return false; - return true; - } - - @Override - public int hashCode() { return Objects.hash(chain.getId(), target); } - - } - - /** A target handler where the target generation logic is delegated to the application provided target selector */ - private static class CustomTargetHandler<T> extends TargetHandler { - - private final TargetSelector<T> selector; - private final FederationTarget<T> target; - - CustomTargetHandler(TargetSelector<T> selector, FederationTarget<T> target) { - this.selector = selector; - this.target = target; - } - - @Override - Chain<Searcher> getChain() { - return target.getChain(); - } - - @Override - public void modifyTargetQuery(Query query) { - selector.modifyTargetQuery(target, query); - } - - @Override - public void modifyTargetResult(Result result) { - selector.modifyTargetResult(target, result); - } - - @Override - public FederationOptions federationOptions() { - return target.getFederationOptions(); - } - - } - - private static class ExecutionInfo { - - private final TargetHandler targetHandler; - private final FederationOptions federationOptions; - private final FutureResult futureResult; - - public ExecutionInfo(TargetHandler targetHandler, FederationOptions federationOptions, FutureResult futureResult) { - this.targetHandler = targetHandler; - this.federationOptions = federationOptions; - this.futureResult = futureResult; - } - - } - - private static class CompoundKey { - - private final String sourceName; - private final String propertyName; - - CompoundKey(String sourceName, String propertyName) { - this.sourceName = sourceName; - this.propertyName = propertyName; - } - - @Override - public int hashCode() { - return sourceName.hashCode() ^ propertyName.hashCode(); - } - - @Override - public boolean equals(Object o) { - CompoundKey rhs = (CompoundKey) o; - return sourceName.equals(rhs.sourceName) && propertyName.equals(rhs.propertyName); - } - - @Override - public String toString() { - return sourceName + '.' + propertyName; - } - } - - private static class SourceKey extends CompoundKey { - - public static final String SOURCE = "source."; - - SourceKey(String sourceName, String propertyName) { - super(sourceName, propertyName); - } - - @Override - public int hashCode() { - return super.hashCode() ^ 7; - } - - @Override - public boolean equals(Object o) { - return (o instanceof SourceKey) && super.equals(o); - } - - @Override - public String toString() { - return SOURCE + super.toString(); - } - } - - private static class ProviderKey extends CompoundKey { - - public static final String PROVIDER = "provider."; - - ProviderKey(String sourceName, String propertyName) { - super(sourceName, propertyName); - } - - @Override - public int hashCode() { - return super.hashCode() ^ 17; - } - - @Override - public boolean equals(Object o) { - return (o instanceof ProviderKey) && super.equals(o); - } - - @Override - public String toString() { - return PROVIDER + super.toString(); - } - - } - private static final Logger log = Logger.getLogger(FederationSearcher.class.getName()); /** The name of the query property containing the source name added to the query to each source by this */ public final static CompoundName SOURCENAME = new CompoundName("sourceName"); public final static CompoundName PROVIDERNAME = new CompoundName("providerName"); - /** Logging field name constants */ public static final String LOG_COUNT_PREFIX = "count_"; @@ -263,6 +81,7 @@ public class FederationSearcher extends ForkingSearcher { private final boolean strictSearchchain; private final TargetSelector<?> targetSelector; + private final Clock clock = Clock.systemUTC(); @Inject public FederationSearcher(FederationConfig config, StrictContractsConfig strict, @@ -274,7 +93,6 @@ public class FederationSearcher extends ForkingSearcher { private static TargetSelector resolveSelector(String selectorId, ComponentRegistry<TargetSelector> targetSelectors) { if (selectorId.isEmpty()) return null; - return checkNotNull(targetSelectors.getComponent(selectorId), "Missing target selector with id" + quote(selectorId)); } @@ -294,7 +112,6 @@ public class FederationSearcher extends ForkingSearcher { this.targetSelector = targetSelector; } - private static SearchChainResolver createResolver(FederationConfig config) { SearchChainResolver.Builder builder = new SearchChainResolver.Builder(); @@ -345,165 +162,89 @@ public class FederationSearcher extends ForkingSearcher { setRequestTimeoutInMilliseconds(searchChain.requestTimeoutMillis()); } - private static long calculateTimeout(Query query, Collection<TargetHandler> targets) { - - class PartitionByOptional { - final List<TargetHandler> mandatoryTargets; - final List<TargetHandler> optionalTargets; + @Override + public Result search(Query query, Execution execution) { + Result mergedResults = execution.search(query); - PartitionByOptional(Collection<TargetHandler> targets) { - List<TargetHandler> mandatoryTargets = new ArrayList<>(); - List<TargetHandler> optionalTargets = new ArrayList<>(); + Results<SearchChainInvocationSpec, UnresolvedSearchChainException> targets = + getTargets(query.getModel().getSources(), query.properties(), execution.context().getIndexFacts()); + warnIfUnresolvedSearchChains(targets.errors(), mergedResults.hits()); - for (TargetHandler target : targets) { - if (target.federationOptions().getOptional()) { - optionalTargets.add(target); - } else { - mandatoryTargets.add(target); - } - } + Collection<SearchChainInvocationSpec> prunedTargets = + pruneTargetsWithoutDocumentTypes(query.getModel().getRestrict(), targets.data()); - this.mandatoryTargets = Collections.unmodifiableList(mandatoryTargets); - this.optionalTargets = Collections.unmodifiableList(optionalTargets); - } - } + Results<Target, ErrorMessage> regularTargetHandlers = resolveSearchChains(prunedTargets, execution.searchChainRegistry()); + query.errors().addAll(regularTargetHandlers.errors()); - if (query.requestHasProperty("timeout") || targets.isEmpty()) { - return query.getTimeLeft(); - } else { - PartitionByOptional partition = new PartitionByOptional(targets); - long queryTimeout = query.getTimeout(); + Set<Target> targetHandlers = new LinkedHashSet<>(regularTargetHandlers.data()); + targetHandlers.addAll(getAdditionalTargets(query, execution, targetSelector)); - return partition.mandatoryTargets.isEmpty() ? - maximumTimeout(partition.optionalTargets, queryTimeout) : - maximumTimeout(partition.mandatoryTargets, queryTimeout); - } - } + traceTargets(query, targetHandlers); - private static long maximumTimeout(List<TargetHandler> invocationSpecs, long queryTimeout) { - long timeout = 0; - for (TargetHandler target : invocationSpecs) { - timeout = Math.max(timeout, - target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout)); - } - return timeout; + if (targetHandlers.isEmpty()) + return mergedResults; + else if (targetHandlers.size() > 1) + search(query, execution, targetHandlers, mergedResults); + else if (shouldExecuteTargetLongerThanThread(query, targetHandlers.iterator().next())) + search(query, execution, targetHandlers, mergedResults); // one target, but search in separate thread + else + search(query, execution, first(targetHandlers), mergedResults); // search in this thread + return mergedResults; } - private void addSearchChainTimedOutError(Query query, - ComponentId searchChainId) { - ErrorMessage timeoutMessage= - ErrorMessage.createTimeout("The search chain '" + searchChainId + "' timed out."); - timeoutMessage.setSource(searchChainId.stringValue()); - query.errors().add(timeoutMessage); + private void search(Query query, Execution execution, Target target, Result mergedResults) { + Result result = search(query, execution, target); + mergeResult(query, target, mergedResults, result); } - private void mergeResult(Query query, TargetHandler targetHandler, - Result mergedResults, Result result) { - - - targetHandler.modifyTargetResult(result); - final ComponentId searchChainId = targetHandler.getId(); - Chain<Searcher> searchChain = targetHandler.getChain(); + private void search(Query query, Execution execution, Collection<Target> targets, Result mergedResults) { + FederationResult results = search(query, execution, targets); + results.waitForAll((int)query.getTimeLeft(), clock); - mergedResults.mergeWith(result); - HitGroup group = result.hits(); - group.setId("source:" + searchChainId.getName()); - - group.setSearcherSpecificMetaData(this, searchChain); - group.setMeta(false); // Set hit groups as non-meta as a default - group.setAuxiliary(true); // Set hit group as auxiliary so that it doesn't contribute to count - group.setSource(searchChainId.getName()); - group.setQuery(result.getQuery()); - - for (Iterator<Hit> it = group.unorderedDeepIterator(); it.hasNext();) { - Hit hit = it.next(); - hit.setSearcherSpecificMetaData(this, searchChain); - hit.setSource(searchChainId.stringValue()); - - // This is the backend request meta hit, that is holding logging information - // See HTTPBackendSearcher, where this hit is created - if (hit.isMeta() && hit.types().contains("logging")) { - // Augment this hit with count fields - hit.setField(LOG_COUNT_PREFIX + "deep", result.getDeepHitCount()); - hit.setField(LOG_COUNT_PREFIX + "total", result.getTotalHitCount()); - int offset = result.getQuery().getOffset(); - hit.setField(LOG_COUNT_PREFIX + "first", offset + 1); - hit.setField(LOG_COUNT_PREFIX + "last", result.getConcreteHitCount() + offset); + HitOrderer s = null; + for (FederationResult.TargetResult targetResult : results.all()) { + if ( ! targetResult.successfullyCompleted()) { + addSearchChainTimedOutError(query, targetResult.target.getId()); + } else { + if (s == null) { + s = dirtyCopyIfModifiedOrderer(mergedResults.hits(), targetResult.getOrTimeoutError().hits().getOrderer()); + } + mergeResult(query, targetResult.target, mergedResults, targetResult.getOrTimeoutError()); } - } - if (query.getTraceLevel()>=4) - query.trace("Got " + group.getConcreteSize() + " hits from " + group.getId(),false, 4); - mergedResults.hits().add(group); } - private boolean successfullyCompleted(FutureResult result) { - return result.isDone() && !result.isCancelled(); - } - - private Query setupSingleQuery(Query query, long timeout, TargetHandler targetHandler) { + private Result search(Query query, Execution execution, Target target) { + long timeout = target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(query.getTimeLeft()); + Execution newExecution = new Execution(target.getChain(), execution.context()); if (strictSearchchain) { query.resetTimeout(); - return setupFederationQuery(query, query, - windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler); + return newExecution.search(createFederationQuery(query, query, Window.from(query), timeout, target)); } else { - return cloneFederationQuery(query, - windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler); + return newExecution.search(cloneFederationQuery(query, Window.from(query), timeout, target)); } } - private Result startExecuteSingleQuery(Query query, TargetHandler chain, long timeout, Execution execution) { - Query outgoing = setupSingleQuery(query, timeout, chain); - Execution exec = new Execution(chain.getChain(), execution.context()); - return exec.search(outgoing); - } - - private List<ExecutionInfo> startExecuteQueryForEachTarget( - Query query, Collection<TargetHandler> targets, long timeout, Execution execution) { - - List<ExecutionInfo> results = new ArrayList<>(); - - Map<String, Object> windowParameters; - if (targets.size()==1) // preserve requested top-level offset by default as an optimization - windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits(), query.getOffset())); - else // request from offset 0 to enable correct upstream blending into a single top-level hit list - windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits() + query.getOffset(), 0)); - - for (TargetHandler targetHandler : targets) { - long executeTimeout = timeout; - if (targetHandler.federationOptions().getRequestTimeoutInMilliseconds() != -1) - executeTimeout = targetHandler.federationOptions().getRequestTimeoutInMilliseconds(); - results.add(new ExecutionInfo(targetHandler, targetHandler.federationOptions(), - createFutureSearch(query, windowParameters, targetHandler, executeTimeout, execution))); - } - - return results; - } - - private Map<String, Object> windowParameters(int hits, int offset) { - Map<String, Object> params = new HashMap<>(); - params.put(Query.HITS.toString(), hits); - params.put(Query.OFFSET.toString(), offset); - return params; + private FederationResult search(Query query, Execution execution, Collection<Target> targets) { + FederationResult.Builder result = new FederationResult.Builder(); + for (Target target : targets) + result.add(target, searchAsynchronously(query, execution, Window.from(targets, query), target)); + return result.build(); } - private FutureResult createFutureSearch(Query query, Map<String, Object> windowParameters, TargetHandler targetHandler, - long timeout, Execution execution) { - Query clonedQuery = cloneFederationQuery(query, windowParameters, timeout, targetHandler); - return new AsyncExecution(targetHandler.getChain(), execution).search(clonedQuery); + private FutureResult searchAsynchronously(Query query, Execution execution, Window window, Target target) { + long timeout = target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(query.getTimeLeft()); + Query clonedQuery = cloneFederationQuery(query, window, timeout, target); + return new AsyncExecution(target.getChain(), execution).search(clonedQuery); } - - private Query cloneFederationQuery(Query query, - Map<String, Object> windowParameters, long timeout, TargetHandler targetHandler) { + private Query cloneFederationQuery(Query query, Window window, long timeout, Target target) { Query clonedQuery = Query.createNewQuery(query); - return setupFederationQuery(query, clonedQuery, windowParameters, timeout, targetHandler); + return createFederationQuery(query, clonedQuery, window, timeout, target); } - private Query setupFederationQuery(Query query, Query outgoing, - Map<String, Object> windowParameters, long timeout, TargetHandler targetHandler) { - - ComponentId chainId = targetHandler.getChain().getId(); + private Query createFederationQuery(Query query, Query outgoing, Window window, long timeout, Target target) { + ComponentId chainId = target.getChain().getId(); String sourceName = chainId.getName(); outgoing.properties().set(SOURCENAME, sourceName); @@ -516,31 +257,28 @@ public class FederationSearcher extends ForkingSearcher { switch (propagateSourceProperties) { case ALL: - propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName, + propagatePerSourceQueryProperties(query, outgoing, window, sourceName, providerName, QueryProperties.PER_SOURCE_QUERY_PROPERTIES); break; case OFFSET_HITS: - propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName, + propagatePerSourceQueryProperties(query, outgoing, window, sourceName, providerName, new CompoundName[]{Query.OFFSET, Query.HITS}); break; } //TODO: FederationTarget //TODO: only for target produced by this, not others - targetHandler.modifyTargetQuery(outgoing); + target.modifyTargetQuery(outgoing); return outgoing; } - private void propagatePerSourceQueryProperties(Query original, Query outgoing, - Map<String, Object> windowParameters, + private void propagatePerSourceQueryProperties(Query original, Query outgoing, Window window, String sourceName, String providerName, CompoundName[] queryProperties) { - for (CompoundName key : queryProperties) { - Object value = getSourceOrProviderProperty(original, key, sourceName, providerName, windowParameters.get(key.toString())); - if (value != null) { + Object value = getSourceOrProviderProperty(original, key, sourceName, providerName, window.get(key)); + if (value != null) outgoing.properties().set(key, value); - } } } @@ -552,12 +290,10 @@ public class FederationSearcher extends ForkingSearcher { result = getProperty(query, new ProviderKey(providerName, propertyName.toString())); if (result == null) result = defaultValue; - return result; } private Object getProperty(Query query, CompoundKey key) { - CompoundName name = map.get(key); if (name == null) { name = new CompoundName(key.toString()); @@ -567,22 +303,15 @@ public class FederationSearcher extends ForkingSearcher { } private ErrorMessage missingSearchChainsErrorMessage(List<UnresolvedSearchChainException> unresolvedSearchChainExceptions) { - StringBuilder sb = new StringBuilder(); - sb.append(StringUtils.join(getMessagesSet(unresolvedSearchChainExceptions), ' ')); - - - sb.append(" Valid source refs are "); - sb.append( - StringUtils.join(allSourceRefDescriptions().iterator(), - ", ")).append('.'); - - return ErrorMessage.createInvalidQueryParameter(sb.toString()); + String message = StringUtils.join(getMessagesSet(unresolvedSearchChainExceptions), ' ') + + " Valid source refs are " + StringUtils.join(allSourceRefDescriptions().iterator(), ", ") +'.'; + return ErrorMessage.createInvalidQueryParameter(message); } private List<String> allSourceRefDescriptions() { List<String> descriptions = new ArrayList<>(); - for (Target target : searchChainResolver.allTopLevelTargets()) { + for (com.yahoo.search.federation.sourceref.Target target : searchChainResolver.allTopLevelTargets()) { descriptions.add(target.searchRefDescription()); } return descriptions; @@ -598,7 +327,6 @@ public class FederationSearcher extends ForkingSearcher { private void warnIfUnresolvedSearchChains(List<UnresolvedSearchChainException> missingTargets, HitGroup errorHitGroup) { - if (!missingTargets.isEmpty()) { errorHitGroup.addError(missingSearchChainsErrorMessage(missingTargets)); } @@ -608,7 +336,7 @@ public class FederationSearcher extends ForkingSearcher { public Collection<CommentedSearchChain> getSearchChainsForwarded(SearchChainRegistry registry) { List<CommentedSearchChain> searchChains = new ArrayList<>(); - for (Target target : searchChainResolver.allTopLevelTargets()) { + for (com.yahoo.search.federation.sourceref.Target target : searchChainResolver.allTopLevelTargets()) { if (target instanceof SourcesTarget) { searchChains.addAll(commentedSourceProviderSearchChains((SourcesTarget)target, registry)); } else if (target instanceof SingleTarget) { @@ -623,12 +351,11 @@ public class FederationSearcher extends ForkingSearcher { private CommentedSearchChain commentedSearchChain(SingleTarget singleTarget, SearchChainRegistry registry) { return new CommentedSearchChain("If source refs contains '" + singleTarget.getId() + "'.", - registry.getChain(singleTarget.getId())); + registry.getChain(singleTarget.getId())); } private List<CommentedSearchChain> commentedSourceProviderSearchChains(SourcesTarget sourcesTarget, SearchChainRegistry registry) { - List<CommentedSearchChain> commentedSearchChains = new ArrayList<>(); String ifMatchingSourceRefPrefix = "If source refs contains '" + sourcesTarget.getId() + "' and provider is '"; @@ -646,9 +373,11 @@ public class FederationSearcher extends ForkingSearcher { return commentedSearchChains; } - /** Returns the set of properties set for the source or provider given in the query (if any). + /** + * Returns the set of properties set for the source or provider given in the query (if any). * - * If the query has not set sourceName or providerName, null will be returned */ + * If the query has not set sourceName or providerName, null will be returned + */ public static Properties getSourceProperties(Query query) { String sourceName = query.properties().getString(SOURCENAME); String providerName = query.properties().getString(PROVIDERNAME); @@ -661,12 +390,12 @@ public class FederationSearcher extends ForkingSearcher { } @Override - public void fill(final Result result, final String summaryClass, Execution execution) { + public void fill(Result result, String summaryClass, Execution execution) { List<FutureResult> filledResults = new ArrayList<>(); UniqueExecutionsToResults uniqueExecutionsToResults = new UniqueExecutionsToResults(); addResultsToFill(result.hits(), result, summaryClass, uniqueExecutionsToResults); - final Set<Entry<Chain<Searcher>, Map<Query, Result>>> resultsForAllChains = uniqueExecutionsToResults.resultsToFill - .entrySet(); + Set<Entry<Chain<Searcher>, Map<Query, Result>>> resultsForAllChains = + uniqueExecutionsToResults.resultsToFill.entrySet(); int numberOfCallsToFillNeeded = 0; for (Entry<Chain<Searcher>, Map<Query, Result>> resultsToFillForAChain : resultsForAllChains) { @@ -699,31 +428,6 @@ public class FederationSearcher extends ForkingSearcher { destination.hits().addError(error); } - /** A map from a unique search chain and query instance to a result */ - private static class UniqueExecutionsToResults { - - /** Implemented as a nested identity hashmap */ - final Map<Chain<Searcher>,Map<Query,Result>> resultsToFill = new IdentityHashMap<>(); - - /** Returns a result to fill for a query and chain, by creating it if necessary */ - public Result get(Chain<Searcher> chain, Query query) { - Map<Query,Result> resultsToFillForAChain = resultsToFill.get(chain); - if (resultsToFillForAChain == null) { - resultsToFillForAChain = new IdentityHashMap<>(); - resultsToFill.put(chain,resultsToFillForAChain); - } - - Result resultsToFillForAChainAndQuery = resultsToFillForAChain.get(query); - if (resultsToFillForAChainAndQuery == null) { - resultsToFillForAChainAndQuery = new Result(query); - resultsToFillForAChain.put(query,resultsToFillForAChainAndQuery); - } - - return resultsToFillForAChainAndQuery; - } - - } - private void addResultsToFill(HitGroup hitGroup, Result result, String summaryClass, UniqueExecutionsToResults uniqueExecutionsToResults) { for (Hit hit : hitGroup) { @@ -744,28 +448,6 @@ public class FederationSearcher extends ForkingSearcher { return uniqueExecutionsToResults.get(chain,query); } - private void searchMultipleTargets(Query query, Result mergedResults, - Collection<TargetHandler> targets, - long timeout, - Execution execution) { - - List<ExecutionInfo> executionInfos = startExecuteQueryForEachTarget(query, targets, timeout, execution); - waitForMandatoryTargets(executionInfos, query.getTimeout()); - - HitOrderer s=null; - for (ExecutionInfo executionInfo : executionInfos) { - if ( ! successfullyCompleted(executionInfo.futureResult)) { - addSearchChainTimedOutError(query, executionInfo.targetHandler.getId()); - } else { - if (s == null) { - s = dirtyCopyIfModifiedOrderer(mergedResults.hits(), executionInfo.futureResult.get().hits().getOrderer()); - } - mergeResult(query, executionInfo.targetHandler, mergedResults, executionInfo.futureResult.get()); - - } - } - } - /** * TODO This is probably a dirty hack for bug 4711376. There are probably better ways. * But I will leave that to trd-processing@ @@ -785,46 +467,6 @@ public class FederationSearcher extends ForkingSearcher { return orderer; } - private void waitForMandatoryTargets(List<ExecutionInfo> executionInfos, long queryTimeout) { - FutureWaiter futureWaiter = new FutureWaiter(); - - boolean hasMandatoryTargets = false; - for (ExecutionInfo executionInfo : executionInfos) { - if (isMandatory(executionInfo)) { - futureWaiter.add(executionInfo.futureResult, - getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout)); - hasMandatoryTargets = true; - } - } - - if (!hasMandatoryTargets) { - for (ExecutionInfo executionInfo : executionInfos) { - futureWaiter.add(executionInfo.futureResult, - getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout)); - } - } - - futureWaiter.waitForFutures(); - } - - private long getSearchChainExecutionTimeoutInMilliseconds(ExecutionInfo executionInfo, long queryTimeout) { - return executionInfo.federationOptions. - getSearchChainExecutionTimeoutInMilliseconds(queryTimeout); - } - - private boolean isMandatory(ExecutionInfo executionInfo) { - return !executionInfo.federationOptions.getOptional(); - } - - private void searchSingleTarget(Query query, Result mergedResults, - TargetHandler targetHandler, - long timeout, - Execution execution) { - Result result = startExecuteSingleQuery(query, targetHandler, timeout, execution); - mergeResult(query, targetHandler, mergedResults, result); - } - - private Results<SearchChainInvocationSpec, UnresolvedSearchChainException> getTargets(Set<String> sources, Properties properties, IndexFacts indexFacts) { return sources.isEmpty() ? defaultSearchChains(properties): @@ -845,11 +487,10 @@ public class FederationSearcher extends ForkingSearcher { return result.build(); } - public Results<SearchChainInvocationSpec, UnresolvedSearchChainException> defaultSearchChains(Properties sourceToProviderMap) { Results.Builder<SearchChainInvocationSpec, UnresolvedSearchChainException> result = new Builder<>(); - for (Target target : searchChainResolver.defaultTargets()) { + for (com.yahoo.search.federation.sourceref.Target target : searchChainResolver.defaultTargets()) { try { result.addData(target.responsibleSearchChain(sourceToProviderMap)); } catch (UnresolvedSearchChainException e) { @@ -865,66 +506,70 @@ public class FederationSearcher extends ForkingSearcher { try { return new ComponentSpecification(source); } catch(Exception e) { - throw new IllegalArgumentException("The source ref '" + source - + "' used for federation is not valid.", e); + throw new IllegalArgumentException("The source ref '" + source + "' used for federation is not valid.", e); } } - @Override - public Result search(Query query, Execution execution) { - Result mergedResults = execution.search(query); - - Results<SearchChainInvocationSpec, UnresolvedSearchChainException> targets = - getTargets(query.getModel().getSources(), query.properties(), execution.context().getIndexFacts()); - warnIfUnresolvedSearchChains(targets.errors(), mergedResults.hits()); - - Collection<SearchChainInvocationSpec> prunedTargets = - pruneTargetsWithoutDocumentTypes(query.getModel().getRestrict(), targets.data()); - - Results<TargetHandler, ErrorMessage> regularTargetHandlers = resolveSearchChains(prunedTargets, execution.searchChainRegistry()); - query.errors().addAll(regularTargetHandlers.errors()); - - Set<TargetHandler> targetHandlers = new LinkedHashSet<>(regularTargetHandlers.data()); - targetHandlers.addAll(getAdditionalTargets(query, execution, targetSelector)); - - long targetsTimeout = calculateTimeout(query, targetHandlers); - if (targetsTimeout < 0) - return new Result(query, ErrorMessage.createTimeout("Timed out when about to federate")); - - traceTargets(query, targetHandlers); - - if (targetHandlers.size() == 0) { - return mergedResults; - } else if (targetHandlers.size() == 1 && - ! shouldExecuteTargetLongerThanThread(query, targetHandlers.iterator().next())) { - TargetHandler chain = first(targetHandlers); - searchSingleTarget(query, mergedResults, chain, targetsTimeout, execution); - } else { - searchMultipleTargets(query, mergedResults, targetHandlers, targetsTimeout, execution); - } - - return mergedResults; - } - - private void traceTargets(Query query, Collection<TargetHandler> targetHandlers) { + private void traceTargets(Query query, Collection<Target> targets) { int traceFederationLevel = 2; if ( ! query.isTraceable(traceFederationLevel)) return; - query.trace("Federating to " + targetHandlers, traceFederationLevel); + query.trace("Federating to " + targets, traceFederationLevel); } /** * Returns true if we are requested to keep executing a target longer than we're waiting for it. * This is useful to populate caches inside targets. */ - private boolean shouldExecuteTargetLongerThanThread(Query query, TargetHandler target) { + private boolean shouldExecuteTargetLongerThanThread(Query query, Target target) { return target.federationOptions().getRequestTimeoutInMilliseconds() > query.getTimeout(); } - private static Results<TargetHandler, ErrorMessage> resolveSearchChains( - Collection<SearchChainInvocationSpec> prunedTargets, - SearchChainRegistry registry) { + private void addSearchChainTimedOutError(Query query, ComponentId searchChainId) { + ErrorMessage timeoutMessage = ErrorMessage.createTimeout("The search chain '" + searchChainId + "' timed out."); + timeoutMessage.setSource(searchChainId.stringValue()); + query.errors().add(timeoutMessage); + } - Results.Builder<TargetHandler, ErrorMessage> targetHandlers = new Results.Builder<>(); + private void mergeResult(Query query, Target target, Result mergedResults, Result result) { + target.modifyTargetResult(result); + ComponentId searchChainId = target.getId(); + Chain<Searcher> searchChain = target.getChain(); + + mergedResults.mergeWith(result); + HitGroup group = result.hits(); + group.setId("source:" + searchChainId.getName()); + + group.setSearcherSpecificMetaData(this, searchChain); + group.setMeta(false); // Set hit groups as non-meta as a default + group.setAuxiliary(true); // Set hit group as auxiliary so that it doesn't contribute to count + group.setSource(searchChainId.getName()); + group.setQuery(result.getQuery()); + + for (Iterator<Hit> it = group.unorderedDeepIterator(); it.hasNext();) { + Hit hit = it.next(); + hit.setSearcherSpecificMetaData(this, searchChain); + hit.setSource(searchChainId.stringValue()); + + // This is the backend request meta hit, that is holding logging information + // See HTTPBackendSearcher, where this hit is created + if (hit.isMeta() && hit.types().contains("logging")) { + // Augment this hit with count fields + hit.setField(LOG_COUNT_PREFIX + "deep", result.getDeepHitCount()); + hit.setField(LOG_COUNT_PREFIX + "total", result.getTotalHitCount()); + int offset = result.getQuery().getOffset(); + hit.setField(LOG_COUNT_PREFIX + "first", offset + 1); + hit.setField(LOG_COUNT_PREFIX + "last", result.getConcreteHitCount() + offset); + } + + } + if (query.getTraceLevel()>=4) + query.trace("Got " + group.getConcreteSize() + " hits from " + group.getId(),false, 4); + mergedResults.hits().add(group); + } + + private Results<Target, ErrorMessage> resolveSearchChains(Collection<SearchChainInvocationSpec> prunedTargets, + SearchChainRegistry registry) { + Results.Builder<Target, ErrorMessage> targetHandlers = new Results.Builder<>(); for (SearchChainInvocationSpec target: prunedTargets) { Chain<Searcher> chain = registry.getChain(target.searchChainId); @@ -932,19 +577,19 @@ public class FederationSearcher extends ForkingSearcher { targetHandlers.addError(ErrorMessage.createIllegalQuery("Could not find search chain '" + target.searchChainId + "'")); } else { - targetHandlers.addData(new StandardTargetHandler(target, chain)); + targetHandlers.addData(new StandardTarget(target, chain)); } } return targetHandlers.build(); } - private static <T> List<TargetHandler> getAdditionalTargets(Query query, Execution execution, TargetSelector<T> targetSelector) { + private static <T> List<Target> getAdditionalTargets(Query query, Execution execution, TargetSelector<T> targetSelector) { if (targetSelector == null) return Collections.emptyList(); - ArrayList<TargetHandler> result = new ArrayList<>(); + ArrayList<Target> result = new ArrayList<>(); for (FederationTarget<T> target: targetSelector.getTargets(query, execution.searchChainRegistry())) - result.add(new CustomTargetHandler<>(targetSelector, target)); + result.add(new CustomTarget<>(targetSelector, target)); return result; } @@ -971,4 +616,227 @@ public class FederationSearcher extends ForkingSearcher { return false; } + /** A map from a unique search chain and query instance to a result */ + private static class UniqueExecutionsToResults { + + /** Implemented as a nested identity hashmap */ + final Map<Chain<Searcher>,Map<Query,Result>> resultsToFill = new IdentityHashMap<>(); + + /** Returns a result to fill for a query and chain, by creating it if necessary */ + public Result get(Chain<Searcher> chain, Query query) { + Map<Query,Result> resultsToFillForAChain = resultsToFill.get(chain); + if (resultsToFillForAChain == null) { + resultsToFillForAChain = new IdentityHashMap<>(); + resultsToFill.put(chain,resultsToFillForAChain); + } + + Result resultsToFillForAChainAndQuery = resultsToFillForAChain.get(query); + if (resultsToFillForAChainAndQuery == null) { + resultsToFillForAChainAndQuery = new Result(query); + resultsToFillForAChain.put(query,resultsToFillForAChainAndQuery); + } + + return resultsToFillForAChainAndQuery; + } + + } + + /** A target for federation, containing a chain to which a federation query can be forwarded. */ + static abstract class Target { + + abstract Chain<Searcher> getChain(); + abstract void modifyTargetQuery(Query query); + abstract void modifyTargetResult(Result result); + + ComponentId getId() { + return getChain().getId(); + } + + public abstract FederationOptions federationOptions(); + + @Override + public String toString() { return getChain().getId().stringValue(); } + + } + + /** + * A handler representing a target created by the federation logic. + * This is a value object, to ensure that identical target invocations are not invoked multiple times. + */ + private static class StandardTarget extends Target { + + private final SearchChainInvocationSpec target; + private final Chain<Searcher> chain; + + public StandardTarget(SearchChainInvocationSpec target, Chain<Searcher> chain) { + this.target = target; + this.chain = chain; + } + + @Override + Chain<Searcher> getChain() { return chain; } + + @Override + void modifyTargetQuery(Query query) {} + @Override + void modifyTargetResult(Result result) {} + + @Override + public FederationOptions federationOptions() { return target.federationOptions; } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! ( o instanceof StandardTarget)) return false; + + StandardTarget other = (StandardTarget)o; + if ( ! Objects.equals(other.chain.getId(), this.chain.getId())) return false; + if ( ! Objects.equals(other.target, this.target)) return false; + return true; + } + + @Override + public int hashCode() { return Objects.hash(chain.getId(), target); } + + } + + /** A target handler where the target generation logic is delegated to the application provided target selector */ + private static class CustomTarget<T> extends Target { + + private final TargetSelector<T> selector; + private final FederationTarget<T> target; + + CustomTarget(TargetSelector<T> selector, FederationTarget<T> target) { + this.selector = selector; + this.target = target; + } + + @Override + Chain<Searcher> getChain() { + return target.getChain(); + } + + @Override + public void modifyTargetQuery(Query query) { + selector.modifyTargetQuery(target, query); + } + + @Override + public void modifyTargetResult(Result result) { + selector.modifyTargetResult(target, result); + } + + @Override + public FederationOptions federationOptions() { + return target.getFederationOptions(); + } + + } + + private static class CompoundKey { + + private final String sourceName; + private final String propertyName; + + CompoundKey(String sourceName, String propertyName) { + this.sourceName = sourceName; + this.propertyName = propertyName; + } + + @Override + public int hashCode() { + return sourceName.hashCode() ^ propertyName.hashCode(); + } + + @Override + public boolean equals(Object o) { + CompoundKey rhs = (CompoundKey) o; + return sourceName.equals(rhs.sourceName) && propertyName.equals(rhs.propertyName); + } + + @Override + public String toString() { + return sourceName + '.' + propertyName; + } + } + + private static class SourceKey extends CompoundKey { + + public static final String SOURCE = "source."; + + SourceKey(String sourceName, String propertyName) { + super(sourceName, propertyName); + } + + @Override + public int hashCode() { + return super.hashCode() ^ 7; + } + + @Override + public boolean equals(Object o) { + return (o instanceof SourceKey) && super.equals(o); + } + + @Override + public String toString() { + return SOURCE + super.toString(); + } + } + + private static class ProviderKey extends CompoundKey { + + public static final String PROVIDER = "provider."; + + ProviderKey(String sourceName, String propertyName) { + super(sourceName, propertyName); + } + + @Override + public int hashCode() { + return super.hashCode() ^ 17; + } + + @Override + public boolean equals(Object o) { + return (o instanceof ProviderKey) && super.equals(o); + } + + @Override + public String toString() { + return PROVIDER + super.toString(); + } + + } + + private static class Window { + + private final int hits; + private final int offset; + + public Window(int hits, int offset) { + this.hits = hits; + this.offset = offset; + } + + public Integer get(CompoundName parameterName) { + if (parameterName.equals(Query.HITS)) return hits; + if (parameterName.equals(Query.OFFSET)) return offset; + return null; + } + + public static Window from(Query query) { + return new Window(query.getHits(), query.getOffset()); + } + + + public static Window from(Collection<Target> targets, Query query) { + if (targets.size() == 1) // preserve requested top-level offsets + return Window.from(query); + else // request from offset 0 to enable correct upstream blending into a single top-level hit list + return new Window(query.getHits() + query.getOffset(), 0); + } + + } + } diff --git a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java index 7e8f3553cab..0c70abbf570 100644 --- a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java @@ -21,7 +21,7 @@ import com.yahoo.search.searchchain.Execution; * * @see FederationSearcher * @since 5.0.13 - * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + * @author Steinar Knutsen */ @After("*") public class ForwardingSearcher extends PingableSearcher { diff --git a/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java b/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java deleted file mode 100644 index 52cd5397489..00000000000 --- a/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.federation; - -import com.yahoo.search.searchchain.FutureResult; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * @author tonytv - */ -class FutureWaiter { - private class Future { - final FutureResult result; - final long timeoutInMilliseconds; - - public Future(FutureResult result, long timeoutInMilliseconds) { - this.result = result; - this.timeoutInMilliseconds = timeoutInMilliseconds; - } - } - - private List<Future> futures = new ArrayList<>(); - - public void add(FutureResult futureResult, long timeoutInMilliseconds) { - futures.add(new Future(futureResult, timeoutInMilliseconds)); - } - - public void waitForFutures() { - sortFuturesByTimeoutDescending(); - - final long startTime = System.currentTimeMillis(); - - for (Future future : futures) { - long timeToWait = startTime + future.timeoutInMilliseconds - System.currentTimeMillis(); - if (timeToWait <= 0) - break; - - future.result.get(timeToWait, TimeUnit.MILLISECONDS); - } - } - - private void sortFuturesByTimeoutDescending() { - Collections.sort(futures, new Comparator<Future>() { - @Override - public int compare(Future lhs, Future rhs) { - return -compareLongs(lhs.timeoutInMilliseconds, rhs.timeoutInMilliseconds); - } - - private int compareLongs(long lhs, long rhs) { - return new Long(lhs).compareTo(rhs); - } - }); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java index e1794a73a93..739337add14 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java @@ -40,7 +40,7 @@ import java.util.concurrent.*; * </p> * * @see com.yahoo.search.searchchain.Execution - * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a> + * @author Arne Bergene Fossaa */ public class AsyncExecution { @@ -174,9 +174,8 @@ public class AsyncExecution { * collection */ public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs) { - // Copy the list in case it is modified while we are waiting - final List<FutureResult> workingTasks = new ArrayList<>(tasks); + List<FutureResult> workingTasks = new ArrayList<>(tasks); try { runTask(() -> { for (FutureResult task : workingTasks) @@ -186,15 +185,13 @@ public class AsyncExecution { // Handle timeouts below } - final List<Result> results = new ArrayList<>(tasks.size()); + List<Result> results = new ArrayList<>(tasks.size()); for (FutureResult atask : workingTasks) { Result result; if (atask.isDone() && !atask.isCancelled()) { - result = atask.get(); // Since isDone() = true, this won't - // block. + result = atask.get(); // Since isDone() = true, this won't block. } else { // Not done and no errors thrown - result = new Result(atask.getQuery(), - atask.createTimeoutError()); + result = new Result(atask.getQuery(), atask.createTimeoutError()); } results.add(result); } diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java index 877252f07e6..ec87305da3b 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.searchchain; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; @@ -16,6 +17,8 @@ import com.yahoo.search.result.ErrorMessage; /** * Extends a {@code FutureTask<Result>}, with some added error handling + * + * @author bratseth */ public class FutureResult extends FutureTask<Result> { @@ -26,51 +29,56 @@ public class FutureResult extends FutureTask<Result> { private final static Logger log = Logger.getLogger(FutureResult.class.getName()); - FutureResult(Callable<Result> callable, Execution execution, final Query query) { + protected FutureResult(Callable<Result> callable, Execution execution, Query query) { super(callable); this.query = query; this.execution = execution; } + /** + * Returns a Result containing the hits returned from this source, or an error otherwise. + * This will block for however long it takes to get the result: Using this is a bad idea. + */ @Override public Result get() { - Result result; try { - result = super.get(); + return super.get(); } catch (InterruptedException e) { - result = new Result(getQuery(), ErrorMessage.createUnspecifiedError( - "'" + execution + "' was interrupted while executing: " + Exceptions.toMessageString(e))); + return new Result(getQuery(), createInterruptedError(e)); } catch (ExecutionException e) { - log.log(Level.WARNING,"Exception on executing " + execution + " for " + query,e); - result = new Result(getQuery(), ErrorMessage.createErrorInPluginSearcher( - "Error in '" + execution + "': " + Exceptions.toMessageString(e), - e.getCause())); + return new Result(getQuery(), createExecutionError(e)); } - return result; } + /** + * Returns a Result containing the hits returned from this source, or an error otherwise. + * This blocks for at most the given timeout and returns a Result containing a timeout error + * if the result is not available within this time. + */ @Override public Result get(long timeout, TimeUnit timeunit) { - Result result; + return getIfAvailable(timeout, timeunit).orElse(new Result(getQuery(), createTimeoutError())); + } + + /** + * Same as get(timeout, timeunit) but returns Optiona.empty instead of a result with error if the result is + * not available in time + */ + public Optional<Result> getIfAvailable(long timeout, TimeUnit timeunit) { try { - result = super.get(timeout, timeunit); + return Optional.of(super.get(timeout, timeunit)); } catch (InterruptedException e) { - result = new Result(getQuery(), ErrorMessage.createUnspecifiedError( - "'" + execution + "' was interrupted while executing: " + Exceptions.toMessageString(e))); + return Optional.of(new Result(getQuery(), createInterruptedError(e))); } catch (ExecutionException e) { - log.log(Level.WARNING,"Exception on executing " + execution + " for " + query, e); - result = new Result(getQuery(), ErrorMessage.createErrorInPluginSearcher( - "Error in '" + execution + "': " + Exceptions.toMessageString(e), - e.getCause())); + return Optional.of(new Result(getQuery(), createExecutionError(e))); } catch (TimeoutException e) { - result = new Result(getQuery(), createTimeoutError()); + return Optional.empty(); } - return result; } /** Returns the query used in this execution, never null */ @@ -78,9 +86,19 @@ public class FutureResult extends FutureTask<Result> { return query; } - ErrorMessage createTimeoutError() { - return ErrorMessage.createTimeout( - "Error executing '" + execution + "': " + " Chain timed out."); + private ErrorMessage createInterruptedError(Exception e) { + return ErrorMessage.createUnspecifiedError("'" + execution + "' was interrupted while executing: " + + Exceptions.toMessageString(e)); + } + + private ErrorMessage createExecutionError(Exception e) { + log.log(Level.WARNING,"Exception on executing " + execution + " for " + query,e); + return ErrorMessage.createErrorInPluginSearcher("Error in '" + execution + "': " + Exceptions.toMessageString(e), + e.getCause()); + } + ErrorMessage createTimeoutError() { + return ErrorMessage.createTimeout("Error executing '" + execution + "': " + " Chain timed out."); } + } diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java b/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java index 73e35046dd4..67753026f44 100644 --- a/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java +++ b/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java @@ -83,9 +83,7 @@ public class FederationOptions implements Cloneable { } public long getSearchChainExecutionTimeoutInMilliseconds(long queryTimeout) { - return getTimeoutInMilliseconds() >= 0 ? - getTimeoutInMilliseconds() : - queryTimeout; + return getTimeoutInMilliseconds() >= 0 ? getTimeoutInMilliseconds() : queryTimeout; } public boolean getUseByDefault() { diff --git a/container-search/src/test/java/com/yahoo/search/federation/FederationResultTest.java b/container-search/src/test/java/com/yahoo/search/federation/FederationResultTest.java new file mode 100644 index 00000000000..eb17cf27db9 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/federation/FederationResultTest.java @@ -0,0 +1,146 @@ +package com.yahoo.search.federation; + +import com.google.common.collect.ImmutableSet; +import com.yahoo.component.chain.Chain; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.Searcher; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.searchchain.Execution; +import com.yahoo.search.searchchain.FutureResult; +import com.yahoo.search.searchchain.model.federation.FederationOptions; +import org.junit.Test; + +import java.time.Clock; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; + +/** + * @author bratseth + */ +public class FederationResultTest { + + private static final FederationSearcher.Target organic = new MockTarget("organic", 250); + private static final FederationSearcher.Target dsp1 = new MockTarget("dsp1", 120); + private static final FederationSearcher.Target dsp2 = new MockTarget("dsp2", 100); + + private final Clock clock = Clock.systemUTC(); + + @Test + public void testFederationResult() { + assertTimeout(ImmutableSet.of(), 50, 100, 90); + assertTimeout(ImmutableSet.of(), 240, 200, 200); + assertTimeout(ImmutableSet.of("dsp1"), 130, 140, 110); + assertTimeout(ImmutableSet.of("organic"), 260, 80, 80); + assertTimeout(ImmutableSet.of("dsp2"), 100, 110, 115); + assertTimeout(ImmutableSet.of(), 100, 110, 105); + assertTimeout(ImmutableSet.of("dsp1", "dsp2"), 100, 130, 130); + assertTimeout(ImmutableSet.of("organic"), 260, 130, 130); + } + + private void assertTimeout(Set<String> expectedTimeoutNames, int ... responseTimes) { + FederationResult.Builder builder = new FederationResult.Builder(); + builder.add(organic, resultAfter(responseTimes[0])); + builder.add(dsp1, resultAfter(responseTimes[1])); + builder.add(dsp2, resultAfter(responseTimes[2])); + FederationResult federationResult = builder.build(); + federationResult.waitForAll(50, clock); + assertEquals(3, federationResult.all().size()); + for (FederationResult.TargetResult targetResult : federationResult.all()) { + Result result = targetResult.getOrTimeoutError(); + if (expectedTimeoutNames.contains(targetResult.target.getId().toString())) + assertTrue(targetResult.target.getId() + " timed out", timedOut(result)); + else + assertTrue(targetResult.target.getId() + " did not time out", ! timedOut(result)); + } + } + + private MockFutureResult resultAfter(int time) { + return new MockFutureResult(new Query(), time); + } + + private boolean timedOut(Result result) { + ErrorMessage error = result.hits().getError(); + if (error == null) return false; + return error.getCode() == ErrorMessage.timeoutCode; + } + + private class MockFutureResult extends FutureResult { + + private final int responseTime; + private final Query query; + private final long startTime; + + MockFutureResult(Query query, int responseTime) { + super(() -> new Result(query), new Execution(Execution.Context.createContextStub()), query); + this.responseTime = responseTime; + this.query = query; + startTime = clock.millis(); + } + + @Override + public Result get() { throw new RuntimeException(); } + + @Override + public Optional<Result> getIfAvailable(long timeout, TimeUnit timeunit) { + if (timeunit != TimeUnit.MILLISECONDS) throw new RuntimeException(); + + long elapsedTime = clock.millis() - startTime; + long leftUntilResponse = responseTime - elapsedTime; + if (leftUntilResponse > timeout) { + sleepUntil(timeout); + return Optional.empty(); + } + else { + sleepUntil(leftUntilResponse); + return Optional.of(new Result(query)); + } + } + + private void sleepUntil(long time) { + if (time <= 0) return; + try { + Thread.sleep(time); + } + catch (InterruptedException e) { + } + } + + @Override + public Query getQuery() { + return query; + } + + } + + private static class MockTarget extends FederationSearcher.Target { + + private final Chain<Searcher> chain; + private final int timeout; + + MockTarget(String id, int timeout) { + this.chain = new Chain<>(id); + this.timeout = timeout; + } + + @Override + Chain<Searcher> getChain() { return chain; } + + @Override + void modifyTargetQuery(Query query) { } + + @Override + void modifyTargetResult(Result result) { } + + @Override + public FederationOptions federationOptions() { + return new FederationOptions(false, timeout, true); + } + + } + +} diff --git a/container-search/src/test/java/com/yahoo/search/federation/FutureWaiterTest.java b/container-search/src/test/java/com/yahoo/search/federation/FutureWaiterTest.java deleted file mode 100644 index 37969e12399..00000000000 --- a/container-search/src/test/java/com/yahoo/search/federation/FutureWaiterTest.java +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.federation; - - -/** - * @author tonytv - */ -// TODO: Fix or remove! -public class FutureWaiterTest { - -/* - - @MockClass(realClass = System.class) - public static class MockSystem { - - private static long currentTime; - private static boolean firstTime; - - private static final long startTime = 123; - - @Mock - public static synchronized long currentTimeMillis() { - if (firstTime) { - firstTime = false; - return startTime; - } - return currentTime; - } - - static synchronized void setElapsedTime(long elapsedTime) { - firstTime = true; - currentTime = elapsedTime + startTime; - } - } - - @Mocked() - FutureResult result1; - - @Mocked() - FutureResult result2; - - @Mocked() - FutureResult result3; - - @Mocked() - FutureResult result4; - - @Before - public void before() { - Mockit.setUpMock(FutureWaiterTest.MockSystem.class); - } - - @After - public void after() { - Mockit.tearDownMocks(); - } - - @Test - public void require_time_to_wait_is_adjusted_for_elapsed_time() { - MockSystem.setElapsedTime(300); - - FutureWaiter futureWaiter = new FutureWaiter(); - futureWaiter.add(result1, 350); - futureWaiter.waitForFutures(); - - new FullVerifications() { - { - result1.get(350 - 300, TimeUnit.MILLISECONDS); - } - }; - } - - @Test - public void require_do_not_wait_for_expired_timeouts() { - MockSystem.setElapsedTime(300); - - FutureWaiter futureWaiter = new FutureWaiter(); - futureWaiter.add(result1, 300); - futureWaiter.add(result2, 290); - - futureWaiter.waitForFutures(); - - new FullVerifications() { - {} - }; - } - - @Test - public void require_wait_for_largest_timeout_first() throws InterruptedException { - MockSystem.setElapsedTime(600); - - FutureWaiter futureWaiter = new FutureWaiter(); - futureWaiter.add(result1, 500); - futureWaiter.add(result4, 800); - futureWaiter.add(result2, 600); - futureWaiter.add(result3, 700); - - futureWaiter.waitForFutures(); - - new FullVerifications() { - { - result4.get(800 - 600, TimeUnit.MILLISECONDS); - result3.get(700 - 600, TimeUnit.MILLISECONDS); - } - }; - } - - */ -} diff --git a/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java b/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java index e4176bb6679..11fae387739 100644 --- a/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java @@ -38,7 +38,7 @@ import static com.yahoo.search.federation.StrictContractsConfig.PropagateSourceP * Test for federation searcher. The searcher is also tested in * com.yahoo.prelude.searcher.test.BlendingSearcherTestCase. * - * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a> + * @author Arne Bergene Fossaa */ @SuppressWarnings("deprecation") public class FederationSearcherTestCase { diff --git a/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java b/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java index 7b0451a01ba..7aa73c64d3a 100644 --- a/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java +++ b/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java @@ -18,6 +18,7 @@ import java.util.Collections; * @author tonytv */ class FederationTester { + SearchChainResolver.Builder builder = new SearchChainResolver.Builder(); SearchChainRegistry registry = new SearchChainRegistry(); |