summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/FederationResult.java134
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java842
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java58
-rw-r--r--container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java64
-rw-r--r--container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java4
-rw-r--r--container-search/src/test/java/com/yahoo/search/federation/FederationResultTest.java146
-rw-r--r--container-search/src/test/java/com/yahoo/search/federation/FutureWaiterTest.java109
-rw-r--r--container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java2
-rw-r--r--container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java1
11 files changed, 685 insertions, 690 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java
new file mode 100644
index 00000000000..a7983447a81
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java
@@ -0,0 +1,134 @@
+package com.yahoo.search.federation;
+
+import com.google.common.collect.ImmutableList;
+import com.yahoo.search.Result;
+import com.yahoo.search.searchchain.FutureResult;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * The result of a federation to targets which knows how to wait for the result from each target.
+ * This thread handles multiple threads producing target results but only a single thread may use an instance of this.
+ *
+ * @author bratseth
+ */
+class FederationResult {
+
+ /** All targets of this */
+ private final List<TargetResult> targetResults;
+
+ /**
+ * The remaining targets to wait for.
+ * Other targets are either complete, or should only be included if they are available when we complete
+ */
+ private List<TargetResult> targetsToWaitFor;
+
+ private FederationResult(ImmutableList<TargetResult> targetResults) {
+ this.targetResults = targetResults;
+
+ if (targetResults.stream().anyMatch(TargetResult::isMandatory))
+ targetsToWaitFor = targetResults.stream().filter(TargetResult::isMandatory).collect(Collectors.toList());
+ else
+ targetsToWaitFor = new ArrayList<>(targetResults);
+ }
+
+ /**
+ * Wait on each target for that targets timeout
+ * On the worst case this is the same as waiting for the max target timeout,
+ * in the average case it may be much better because lower timeout sources do not get to
+ * drive the timeout above their own timeout value.
+ * When this completes, results can be accessed from the TargetResults with no blocking
+ * (i.e getOrTimeout) without breaking any contract.
+ */
+ public void waitForAll(int queryTimeout, Clock clock) {
+ long startTime = clock.millis();
+ while ( ! targetsToWaitFor.isEmpty()) {
+ TargetResult nextToWaitFor = targetWithSmallestTimeout(targetsToWaitFor, queryTimeout);
+ long timeLeftOfNextTimeout = nextToWaitFor.timeout(queryTimeout) - ( clock.millis() - startTime );
+ nextToWaitFor.getIfAvailable(timeLeftOfNextTimeout);
+ targetsToWaitFor.remove(nextToWaitFor);
+ }
+ }
+
+ /** Returns an immutable list of the results of this */
+ public List<TargetResult> all() { return targetResults; }
+
+ private TargetResult targetWithSmallestTimeout(List<TargetResult> results, int queryTimeout) {
+ TargetResult smallest = null;
+ for (TargetResult result : results) {
+ if (smallest == null || result.timeout(queryTimeout) < smallest.timeout(queryTimeout))
+ smallest = result;
+ }
+ return smallest;
+ }
+
+ static class TargetResult {
+
+ final FederationSearcher.Target target;
+ private final FutureResult futureResult;
+
+ /**
+ * Single threaded access to result already returned from futureResult, if any.
+ * To avoid unnecessary synchronization with the producer thread.
+ */
+ private Optional<Result> availableResult = Optional.empty();
+
+ private TargetResult(FederationSearcher.Target target, FutureResult futureResult) {
+ this.target = target;
+ this.futureResult = futureResult;
+ }
+
+ private boolean isMandatory() { return ! target.federationOptions().getOptional(); }
+
+ /**
+ * Returns the result of this by blocking until timeout if necessary.
+ *
+ * @return the result if available, or null otherwise
+ */
+ public Optional<Result> getIfAvailable(long timeout) {
+ if (availableResult.isPresent()) return availableResult;
+ availableResult = futureResult.getIfAvailable(timeout, TimeUnit.MILLISECONDS);
+ return availableResult;
+ }
+
+ /** Returns a result without blocking; if the result is not available one with a timeout error is produced */
+ public Result getOrTimeoutError() {
+ // The else part is to offload creation of the timeout error
+ return getIfAvailable(0).orElse(futureResult.get(0, TimeUnit.MILLISECONDS));
+ }
+
+ public boolean successfullyCompleted() {
+ return futureResult.isDone() && ! futureResult.isCancelled();
+ }
+
+ private int timeout(long queryTimeout) {
+ return (int)target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout);
+ }
+
+ @Override
+ public String toString() {
+ return "result for " + target;
+ }
+
+ }
+
+ public static class Builder {
+
+ private final ImmutableList.Builder<TargetResult> results = new ImmutableList.Builder();
+
+ public void add(FederationSearcher.Target target, FutureResult futureResult) {
+ results.add(new TargetResult(target, futureResult));
+ }
+
+ public FederationResult build() {
+ return new FederationResult(results.build());
+ }
+
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
index 9c94fd04ea5..621f0fbe090 100644
--- a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
@@ -22,7 +22,6 @@ import com.yahoo.search.federation.sourceref.SearchChainResolver;
import com.yahoo.search.federation.sourceref.SingleTarget;
import com.yahoo.search.federation.sourceref.SourceRefResolver;
import com.yahoo.search.federation.sourceref.SourcesTarget;
-import com.yahoo.search.federation.sourceref.Target;
import com.yahoo.search.federation.sourceref.UnresolvedSearchChainException;
import com.yahoo.search.query.Properties;
import com.yahoo.search.query.properties.QueryProperties;
@@ -46,6 +45,7 @@ import static com.yahoo.collections.CollectionUtil.first;
import static com.yahoo.container.util.Util.quote;
import static com.yahoo.search.federation.StrictContractsConfig.PropagateSourceProperties;
+import java.time.Clock;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
@@ -54,8 +54,9 @@ import java.util.logging.Logger;
/**
* This searcher takes a set of sources, looks them up in config and fire off the correct searchchains.
*
- * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author Arne Bergene Fossaa
* @author tonytv
+ * @author bratseth
*/
@Provides(FederationSearcher.FEDERATION)
@After("*")
@@ -63,195 +64,12 @@ public class FederationSearcher extends ForkingSearcher {
public static final String FEDERATION = "Federation";
- /** A target for federation, containing a chain to which a federation query can be forwarded. */
- private static abstract class TargetHandler {
-
- abstract Chain<Searcher> getChain();
- abstract void modifyTargetQuery(Query query);
- abstract void modifyTargetResult(Result result);
-
- ComponentId getId() {
- return getChain().getId();
- }
-
- public abstract FederationOptions federationOptions();
-
- @Override
- public String toString() { return getChain().getId().stringValue(); }
-
- }
-
- /**
- * A handler representing a target created by the federation logic.
- * This is a value object, to ensure that identical target invocations are not invoked multiple times.
- */
- private static class StandardTargetHandler extends TargetHandler {
-
- private final SearchChainInvocationSpec target;
- private final Chain<Searcher> chain;
-
- public StandardTargetHandler(SearchChainInvocationSpec target, Chain<Searcher> chain) {
- this.target = target;
- this.chain = chain;
- }
-
- @Override
- Chain<Searcher> getChain() { return chain; }
-
- @Override
- void modifyTargetQuery(Query query) {}
- @Override
- void modifyTargetResult(Result result) {}
-
- @Override
- public FederationOptions federationOptions() { return target.federationOptions; }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) return true;
- if ( ! ( o instanceof StandardTargetHandler)) return false;
-
- StandardTargetHandler other = (StandardTargetHandler)o;
- if ( ! Objects.equals(other.chain.getId(), this.chain.getId())) return false;
- if ( ! Objects.equals(other.target, this.target)) return false;
- return true;
- }
-
- @Override
- public int hashCode() { return Objects.hash(chain.getId(), target); }
-
- }
-
- /** A target handler where the target generation logic is delegated to the application provided target selector */
- private static class CustomTargetHandler<T> extends TargetHandler {
-
- private final TargetSelector<T> selector;
- private final FederationTarget<T> target;
-
- CustomTargetHandler(TargetSelector<T> selector, FederationTarget<T> target) {
- this.selector = selector;
- this.target = target;
- }
-
- @Override
- Chain<Searcher> getChain() {
- return target.getChain();
- }
-
- @Override
- public void modifyTargetQuery(Query query) {
- selector.modifyTargetQuery(target, query);
- }
-
- @Override
- public void modifyTargetResult(Result result) {
- selector.modifyTargetResult(target, result);
- }
-
- @Override
- public FederationOptions federationOptions() {
- return target.getFederationOptions();
- }
-
- }
-
- private static class ExecutionInfo {
-
- private final TargetHandler targetHandler;
- private final FederationOptions federationOptions;
- private final FutureResult futureResult;
-
- public ExecutionInfo(TargetHandler targetHandler, FederationOptions federationOptions, FutureResult futureResult) {
- this.targetHandler = targetHandler;
- this.federationOptions = federationOptions;
- this.futureResult = futureResult;
- }
-
- }
-
- private static class CompoundKey {
-
- private final String sourceName;
- private final String propertyName;
-
- CompoundKey(String sourceName, String propertyName) {
- this.sourceName = sourceName;
- this.propertyName = propertyName;
- }
-
- @Override
- public int hashCode() {
- return sourceName.hashCode() ^ propertyName.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- CompoundKey rhs = (CompoundKey) o;
- return sourceName.equals(rhs.sourceName) && propertyName.equals(rhs.propertyName);
- }
-
- @Override
- public String toString() {
- return sourceName + '.' + propertyName;
- }
- }
-
- private static class SourceKey extends CompoundKey {
-
- public static final String SOURCE = "source.";
-
- SourceKey(String sourceName, String propertyName) {
- super(sourceName, propertyName);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode() ^ 7;
- }
-
- @Override
- public boolean equals(Object o) {
- return (o instanceof SourceKey) && super.equals(o);
- }
-
- @Override
- public String toString() {
- return SOURCE + super.toString();
- }
- }
-
- private static class ProviderKey extends CompoundKey {
-
- public static final String PROVIDER = "provider.";
-
- ProviderKey(String sourceName, String propertyName) {
- super(sourceName, propertyName);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode() ^ 17;
- }
-
- @Override
- public boolean equals(Object o) {
- return (o instanceof ProviderKey) && super.equals(o);
- }
-
- @Override
- public String toString() {
- return PROVIDER + super.toString();
- }
-
- }
-
private static final Logger log = Logger.getLogger(FederationSearcher.class.getName());
/** The name of the query property containing the source name added to the query to each source by this */
public final static CompoundName SOURCENAME = new CompoundName("sourceName");
public final static CompoundName PROVIDERNAME = new CompoundName("providerName");
-
/** Logging field name constants */
public static final String LOG_COUNT_PREFIX = "count_";
@@ -263,6 +81,7 @@ public class FederationSearcher extends ForkingSearcher {
private final boolean strictSearchchain;
private final TargetSelector<?> targetSelector;
+ private final Clock clock = Clock.systemUTC();
@Inject
public FederationSearcher(FederationConfig config, StrictContractsConfig strict,
@@ -274,7 +93,6 @@ public class FederationSearcher extends ForkingSearcher {
private static TargetSelector resolveSelector(String selectorId,
ComponentRegistry<TargetSelector> targetSelectors) {
if (selectorId.isEmpty()) return null;
-
return checkNotNull(targetSelectors.getComponent(selectorId),
"Missing target selector with id" + quote(selectorId));
}
@@ -294,7 +112,6 @@ public class FederationSearcher extends ForkingSearcher {
this.targetSelector = targetSelector;
}
-
private static SearchChainResolver createResolver(FederationConfig config) {
SearchChainResolver.Builder builder = new SearchChainResolver.Builder();
@@ -345,165 +162,89 @@ public class FederationSearcher extends ForkingSearcher {
setRequestTimeoutInMilliseconds(searchChain.requestTimeoutMillis());
}
- private static long calculateTimeout(Query query, Collection<TargetHandler> targets) {
-
- class PartitionByOptional {
- final List<TargetHandler> mandatoryTargets;
- final List<TargetHandler> optionalTargets;
+ @Override
+ public Result search(Query query, Execution execution) {
+ Result mergedResults = execution.search(query);
- PartitionByOptional(Collection<TargetHandler> targets) {
- List<TargetHandler> mandatoryTargets = new ArrayList<>();
- List<TargetHandler> optionalTargets = new ArrayList<>();
+ Results<SearchChainInvocationSpec, UnresolvedSearchChainException> targets =
+ getTargets(query.getModel().getSources(), query.properties(), execution.context().getIndexFacts());
+ warnIfUnresolvedSearchChains(targets.errors(), mergedResults.hits());
- for (TargetHandler target : targets) {
- if (target.federationOptions().getOptional()) {
- optionalTargets.add(target);
- } else {
- mandatoryTargets.add(target);
- }
- }
+ Collection<SearchChainInvocationSpec> prunedTargets =
+ pruneTargetsWithoutDocumentTypes(query.getModel().getRestrict(), targets.data());
- this.mandatoryTargets = Collections.unmodifiableList(mandatoryTargets);
- this.optionalTargets = Collections.unmodifiableList(optionalTargets);
- }
- }
+ Results<Target, ErrorMessage> regularTargetHandlers = resolveSearchChains(prunedTargets, execution.searchChainRegistry());
+ query.errors().addAll(regularTargetHandlers.errors());
- if (query.requestHasProperty("timeout") || targets.isEmpty()) {
- return query.getTimeLeft();
- } else {
- PartitionByOptional partition = new PartitionByOptional(targets);
- long queryTimeout = query.getTimeout();
+ Set<Target> targetHandlers = new LinkedHashSet<>(regularTargetHandlers.data());
+ targetHandlers.addAll(getAdditionalTargets(query, execution, targetSelector));
- return partition.mandatoryTargets.isEmpty() ?
- maximumTimeout(partition.optionalTargets, queryTimeout) :
- maximumTimeout(partition.mandatoryTargets, queryTimeout);
- }
- }
+ traceTargets(query, targetHandlers);
- private static long maximumTimeout(List<TargetHandler> invocationSpecs, long queryTimeout) {
- long timeout = 0;
- for (TargetHandler target : invocationSpecs) {
- timeout = Math.max(timeout,
- target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout));
- }
- return timeout;
+ if (targetHandlers.isEmpty())
+ return mergedResults;
+ else if (targetHandlers.size() > 1)
+ search(query, execution, targetHandlers, mergedResults);
+ else if (shouldExecuteTargetLongerThanThread(query, targetHandlers.iterator().next()))
+ search(query, execution, targetHandlers, mergedResults); // one target, but search in separate thread
+ else
+ search(query, execution, first(targetHandlers), mergedResults); // search in this thread
+ return mergedResults;
}
- private void addSearchChainTimedOutError(Query query,
- ComponentId searchChainId) {
- ErrorMessage timeoutMessage=
- ErrorMessage.createTimeout("The search chain '" + searchChainId + "' timed out.");
- timeoutMessage.setSource(searchChainId.stringValue());
- query.errors().add(timeoutMessage);
+ private void search(Query query, Execution execution, Target target, Result mergedResults) {
+ Result result = search(query, execution, target);
+ mergeResult(query, target, mergedResults, result);
}
- private void mergeResult(Query query, TargetHandler targetHandler,
- Result mergedResults, Result result) {
-
-
- targetHandler.modifyTargetResult(result);
- final ComponentId searchChainId = targetHandler.getId();
- Chain<Searcher> searchChain = targetHandler.getChain();
+ private void search(Query query, Execution execution, Collection<Target> targets, Result mergedResults) {
+ FederationResult results = search(query, execution, targets);
+ results.waitForAll((int)query.getTimeLeft(), clock);
- mergedResults.mergeWith(result);
- HitGroup group = result.hits();
- group.setId("source:" + searchChainId.getName());
-
- group.setSearcherSpecificMetaData(this, searchChain);
- group.setMeta(false); // Set hit groups as non-meta as a default
- group.setAuxiliary(true); // Set hit group as auxiliary so that it doesn't contribute to count
- group.setSource(searchChainId.getName());
- group.setQuery(result.getQuery());
-
- for (Iterator<Hit> it = group.unorderedDeepIterator(); it.hasNext();) {
- Hit hit = it.next();
- hit.setSearcherSpecificMetaData(this, searchChain);
- hit.setSource(searchChainId.stringValue());
-
- // This is the backend request meta hit, that is holding logging information
- // See HTTPBackendSearcher, where this hit is created
- if (hit.isMeta() && hit.types().contains("logging")) {
- // Augment this hit with count fields
- hit.setField(LOG_COUNT_PREFIX + "deep", result.getDeepHitCount());
- hit.setField(LOG_COUNT_PREFIX + "total", result.getTotalHitCount());
- int offset = result.getQuery().getOffset();
- hit.setField(LOG_COUNT_PREFIX + "first", offset + 1);
- hit.setField(LOG_COUNT_PREFIX + "last", result.getConcreteHitCount() + offset);
+ HitOrderer s = null;
+ for (FederationResult.TargetResult targetResult : results.all()) {
+ if ( ! targetResult.successfullyCompleted()) {
+ addSearchChainTimedOutError(query, targetResult.target.getId());
+ } else {
+ if (s == null) {
+ s = dirtyCopyIfModifiedOrderer(mergedResults.hits(), targetResult.getOrTimeoutError().hits().getOrderer());
+ }
+ mergeResult(query, targetResult.target, mergedResults, targetResult.getOrTimeoutError());
}
-
}
- if (query.getTraceLevel()>=4)
- query.trace("Got " + group.getConcreteSize() + " hits from " + group.getId(),false, 4);
- mergedResults.hits().add(group);
}
- private boolean successfullyCompleted(FutureResult result) {
- return result.isDone() && !result.isCancelled();
- }
-
- private Query setupSingleQuery(Query query, long timeout, TargetHandler targetHandler) {
+ private Result search(Query query, Execution execution, Target target) {
+ long timeout = target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(query.getTimeLeft());
+ Execution newExecution = new Execution(target.getChain(), execution.context());
if (strictSearchchain) {
query.resetTimeout();
- return setupFederationQuery(query, query,
- windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler);
+ return newExecution.search(createFederationQuery(query, query, Window.from(query), timeout, target));
} else {
- return cloneFederationQuery(query,
- windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler);
+ return newExecution.search(cloneFederationQuery(query, Window.from(query), timeout, target));
}
}
- private Result startExecuteSingleQuery(Query query, TargetHandler chain, long timeout, Execution execution) {
- Query outgoing = setupSingleQuery(query, timeout, chain);
- Execution exec = new Execution(chain.getChain(), execution.context());
- return exec.search(outgoing);
- }
-
- private List<ExecutionInfo> startExecuteQueryForEachTarget(
- Query query, Collection<TargetHandler> targets, long timeout, Execution execution) {
-
- List<ExecutionInfo> results = new ArrayList<>();
-
- Map<String, Object> windowParameters;
- if (targets.size()==1) // preserve requested top-level offset by default as an optimization
- windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits(), query.getOffset()));
- else // request from offset 0 to enable correct upstream blending into a single top-level hit list
- windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits() + query.getOffset(), 0));
-
- for (TargetHandler targetHandler : targets) {
- long executeTimeout = timeout;
- if (targetHandler.federationOptions().getRequestTimeoutInMilliseconds() != -1)
- executeTimeout = targetHandler.federationOptions().getRequestTimeoutInMilliseconds();
- results.add(new ExecutionInfo(targetHandler, targetHandler.federationOptions(),
- createFutureSearch(query, windowParameters, targetHandler, executeTimeout, execution)));
- }
-
- return results;
- }
-
- private Map<String, Object> windowParameters(int hits, int offset) {
- Map<String, Object> params = new HashMap<>();
- params.put(Query.HITS.toString(), hits);
- params.put(Query.OFFSET.toString(), offset);
- return params;
+ private FederationResult search(Query query, Execution execution, Collection<Target> targets) {
+ FederationResult.Builder result = new FederationResult.Builder();
+ for (Target target : targets)
+ result.add(target, searchAsynchronously(query, execution, Window.from(targets, query), target));
+ return result.build();
}
- private FutureResult createFutureSearch(Query query, Map<String, Object> windowParameters, TargetHandler targetHandler,
- long timeout, Execution execution) {
- Query clonedQuery = cloneFederationQuery(query, windowParameters, timeout, targetHandler);
- return new AsyncExecution(targetHandler.getChain(), execution).search(clonedQuery);
+ private FutureResult searchAsynchronously(Query query, Execution execution, Window window, Target target) {
+ long timeout = target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(query.getTimeLeft());
+ Query clonedQuery = cloneFederationQuery(query, window, timeout, target);
+ return new AsyncExecution(target.getChain(), execution).search(clonedQuery);
}
-
- private Query cloneFederationQuery(Query query,
- Map<String, Object> windowParameters, long timeout, TargetHandler targetHandler) {
+ private Query cloneFederationQuery(Query query, Window window, long timeout, Target target) {
Query clonedQuery = Query.createNewQuery(query);
- return setupFederationQuery(query, clonedQuery, windowParameters, timeout, targetHandler);
+ return createFederationQuery(query, clonedQuery, window, timeout, target);
}
- private Query setupFederationQuery(Query query, Query outgoing,
- Map<String, Object> windowParameters, long timeout, TargetHandler targetHandler) {
-
- ComponentId chainId = targetHandler.getChain().getId();
+ private Query createFederationQuery(Query query, Query outgoing, Window window, long timeout, Target target) {
+ ComponentId chainId = target.getChain().getId();
String sourceName = chainId.getName();
outgoing.properties().set(SOURCENAME, sourceName);
@@ -516,31 +257,28 @@ public class FederationSearcher extends ForkingSearcher {
switch (propagateSourceProperties) {
case ALL:
- propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName,
+ propagatePerSourceQueryProperties(query, outgoing, window, sourceName, providerName,
QueryProperties.PER_SOURCE_QUERY_PROPERTIES);
break;
case OFFSET_HITS:
- propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName,
+ propagatePerSourceQueryProperties(query, outgoing, window, sourceName, providerName,
new CompoundName[]{Query.OFFSET, Query.HITS});
break;
}
//TODO: FederationTarget
//TODO: only for target produced by this, not others
- targetHandler.modifyTargetQuery(outgoing);
+ target.modifyTargetQuery(outgoing);
return outgoing;
}
- private void propagatePerSourceQueryProperties(Query original, Query outgoing,
- Map<String, Object> windowParameters,
+ private void propagatePerSourceQueryProperties(Query original, Query outgoing, Window window,
String sourceName, String providerName,
CompoundName[] queryProperties) {
-
for (CompoundName key : queryProperties) {
- Object value = getSourceOrProviderProperty(original, key, sourceName, providerName, windowParameters.get(key.toString()));
- if (value != null) {
+ Object value = getSourceOrProviderProperty(original, key, sourceName, providerName, window.get(key));
+ if (value != null)
outgoing.properties().set(key, value);
- }
}
}
@@ -552,12 +290,10 @@ public class FederationSearcher extends ForkingSearcher {
result = getProperty(query, new ProviderKey(providerName, propertyName.toString()));
if (result == null)
result = defaultValue;
-
return result;
}
private Object getProperty(Query query, CompoundKey key) {
-
CompoundName name = map.get(key);
if (name == null) {
name = new CompoundName(key.toString());
@@ -567,22 +303,15 @@ public class FederationSearcher extends ForkingSearcher {
}
private ErrorMessage missingSearchChainsErrorMessage(List<UnresolvedSearchChainException> unresolvedSearchChainExceptions) {
- StringBuilder sb = new StringBuilder();
- sb.append(StringUtils.join(getMessagesSet(unresolvedSearchChainExceptions), ' '));
-
-
- sb.append(" Valid source refs are ");
- sb.append(
- StringUtils.join(allSourceRefDescriptions().iterator(),
- ", ")).append('.');
-
- return ErrorMessage.createInvalidQueryParameter(sb.toString());
+ String message = StringUtils.join(getMessagesSet(unresolvedSearchChainExceptions), ' ') +
+ " Valid source refs are " + StringUtils.join(allSourceRefDescriptions().iterator(), ", ") +'.';
+ return ErrorMessage.createInvalidQueryParameter(message);
}
private List<String> allSourceRefDescriptions() {
List<String> descriptions = new ArrayList<>();
- for (Target target : searchChainResolver.allTopLevelTargets()) {
+ for (com.yahoo.search.federation.sourceref.Target target : searchChainResolver.allTopLevelTargets()) {
descriptions.add(target.searchRefDescription());
}
return descriptions;
@@ -598,7 +327,6 @@ public class FederationSearcher extends ForkingSearcher {
private void warnIfUnresolvedSearchChains(List<UnresolvedSearchChainException> missingTargets,
HitGroup errorHitGroup) {
-
if (!missingTargets.isEmpty()) {
errorHitGroup.addError(missingSearchChainsErrorMessage(missingTargets));
}
@@ -608,7 +336,7 @@ public class FederationSearcher extends ForkingSearcher {
public Collection<CommentedSearchChain> getSearchChainsForwarded(SearchChainRegistry registry) {
List<CommentedSearchChain> searchChains = new ArrayList<>();
- for (Target target : searchChainResolver.allTopLevelTargets()) {
+ for (com.yahoo.search.federation.sourceref.Target target : searchChainResolver.allTopLevelTargets()) {
if (target instanceof SourcesTarget) {
searchChains.addAll(commentedSourceProviderSearchChains((SourcesTarget)target, registry));
} else if (target instanceof SingleTarget) {
@@ -623,12 +351,11 @@ public class FederationSearcher extends ForkingSearcher {
private CommentedSearchChain commentedSearchChain(SingleTarget singleTarget, SearchChainRegistry registry) {
return new CommentedSearchChain("If source refs contains '" + singleTarget.getId() + "'.",
- registry.getChain(singleTarget.getId()));
+ registry.getChain(singleTarget.getId()));
}
private List<CommentedSearchChain> commentedSourceProviderSearchChains(SourcesTarget sourcesTarget,
SearchChainRegistry registry) {
-
List<CommentedSearchChain> commentedSearchChains = new ArrayList<>();
String ifMatchingSourceRefPrefix = "If source refs contains '" + sourcesTarget.getId() + "' and provider is '";
@@ -646,9 +373,11 @@ public class FederationSearcher extends ForkingSearcher {
return commentedSearchChains;
}
- /** Returns the set of properties set for the source or provider given in the query (if any).
+ /**
+ * Returns the set of properties set for the source or provider given in the query (if any).
*
- * If the query has not set sourceName or providerName, null will be returned */
+ * If the query has not set sourceName or providerName, null will be returned
+ */
public static Properties getSourceProperties(Query query) {
String sourceName = query.properties().getString(SOURCENAME);
String providerName = query.properties().getString(PROVIDERNAME);
@@ -661,12 +390,12 @@ public class FederationSearcher extends ForkingSearcher {
}
@Override
- public void fill(final Result result, final String summaryClass, Execution execution) {
+ public void fill(Result result, String summaryClass, Execution execution) {
List<FutureResult> filledResults = new ArrayList<>();
UniqueExecutionsToResults uniqueExecutionsToResults = new UniqueExecutionsToResults();
addResultsToFill(result.hits(), result, summaryClass, uniqueExecutionsToResults);
- final Set<Entry<Chain<Searcher>, Map<Query, Result>>> resultsForAllChains = uniqueExecutionsToResults.resultsToFill
- .entrySet();
+ Set<Entry<Chain<Searcher>, Map<Query, Result>>> resultsForAllChains =
+ uniqueExecutionsToResults.resultsToFill.entrySet();
int numberOfCallsToFillNeeded = 0;
for (Entry<Chain<Searcher>, Map<Query, Result>> resultsToFillForAChain : resultsForAllChains) {
@@ -699,31 +428,6 @@ public class FederationSearcher extends ForkingSearcher {
destination.hits().addError(error);
}
- /** A map from a unique search chain and query instance to a result */
- private static class UniqueExecutionsToResults {
-
- /** Implemented as a nested identity hashmap */
- final Map<Chain<Searcher>,Map<Query,Result>> resultsToFill = new IdentityHashMap<>();
-
- /** Returns a result to fill for a query and chain, by creating it if necessary */
- public Result get(Chain<Searcher> chain, Query query) {
- Map<Query,Result> resultsToFillForAChain = resultsToFill.get(chain);
- if (resultsToFillForAChain == null) {
- resultsToFillForAChain = new IdentityHashMap<>();
- resultsToFill.put(chain,resultsToFillForAChain);
- }
-
- Result resultsToFillForAChainAndQuery = resultsToFillForAChain.get(query);
- if (resultsToFillForAChainAndQuery == null) {
- resultsToFillForAChainAndQuery = new Result(query);
- resultsToFillForAChain.put(query,resultsToFillForAChainAndQuery);
- }
-
- return resultsToFillForAChainAndQuery;
- }
-
- }
-
private void addResultsToFill(HitGroup hitGroup, Result result, String summaryClass,
UniqueExecutionsToResults uniqueExecutionsToResults) {
for (Hit hit : hitGroup) {
@@ -744,28 +448,6 @@ public class FederationSearcher extends ForkingSearcher {
return uniqueExecutionsToResults.get(chain,query);
}
- private void searchMultipleTargets(Query query, Result mergedResults,
- Collection<TargetHandler> targets,
- long timeout,
- Execution execution) {
-
- List<ExecutionInfo> executionInfos = startExecuteQueryForEachTarget(query, targets, timeout, execution);
- waitForMandatoryTargets(executionInfos, query.getTimeout());
-
- HitOrderer s=null;
- for (ExecutionInfo executionInfo : executionInfos) {
- if ( ! successfullyCompleted(executionInfo.futureResult)) {
- addSearchChainTimedOutError(query, executionInfo.targetHandler.getId());
- } else {
- if (s == null) {
- s = dirtyCopyIfModifiedOrderer(mergedResults.hits(), executionInfo.futureResult.get().hits().getOrderer());
- }
- mergeResult(query, executionInfo.targetHandler, mergedResults, executionInfo.futureResult.get());
-
- }
- }
- }
-
/**
* TODO This is probably a dirty hack for bug 4711376. There are probably better ways.
* But I will leave that to trd-processing@
@@ -785,46 +467,6 @@ public class FederationSearcher extends ForkingSearcher {
return orderer;
}
- private void waitForMandatoryTargets(List<ExecutionInfo> executionInfos, long queryTimeout) {
- FutureWaiter futureWaiter = new FutureWaiter();
-
- boolean hasMandatoryTargets = false;
- for (ExecutionInfo executionInfo : executionInfos) {
- if (isMandatory(executionInfo)) {
- futureWaiter.add(executionInfo.futureResult,
- getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout));
- hasMandatoryTargets = true;
- }
- }
-
- if (!hasMandatoryTargets) {
- for (ExecutionInfo executionInfo : executionInfos) {
- futureWaiter.add(executionInfo.futureResult,
- getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout));
- }
- }
-
- futureWaiter.waitForFutures();
- }
-
- private long getSearchChainExecutionTimeoutInMilliseconds(ExecutionInfo executionInfo, long queryTimeout) {
- return executionInfo.federationOptions.
- getSearchChainExecutionTimeoutInMilliseconds(queryTimeout);
- }
-
- private boolean isMandatory(ExecutionInfo executionInfo) {
- return !executionInfo.federationOptions.getOptional();
- }
-
- private void searchSingleTarget(Query query, Result mergedResults,
- TargetHandler targetHandler,
- long timeout,
- Execution execution) {
- Result result = startExecuteSingleQuery(query, targetHandler, timeout, execution);
- mergeResult(query, targetHandler, mergedResults, result);
- }
-
-
private Results<SearchChainInvocationSpec, UnresolvedSearchChainException> getTargets(Set<String> sources, Properties properties, IndexFacts indexFacts) {
return sources.isEmpty() ?
defaultSearchChains(properties):
@@ -845,11 +487,10 @@ public class FederationSearcher extends ForkingSearcher {
return result.build();
}
-
public Results<SearchChainInvocationSpec, UnresolvedSearchChainException> defaultSearchChains(Properties sourceToProviderMap) {
Results.Builder<SearchChainInvocationSpec, UnresolvedSearchChainException> result = new Builder<>();
- for (Target target : searchChainResolver.defaultTargets()) {
+ for (com.yahoo.search.federation.sourceref.Target target : searchChainResolver.defaultTargets()) {
try {
result.addData(target.responsibleSearchChain(sourceToProviderMap));
} catch (UnresolvedSearchChainException e) {
@@ -865,66 +506,70 @@ public class FederationSearcher extends ForkingSearcher {
try {
return new ComponentSpecification(source);
} catch(Exception e) {
- throw new IllegalArgumentException("The source ref '" + source
- + "' used for federation is not valid.", e);
+ throw new IllegalArgumentException("The source ref '" + source + "' used for federation is not valid.", e);
}
}
- @Override
- public Result search(Query query, Execution execution) {
- Result mergedResults = execution.search(query);
-
- Results<SearchChainInvocationSpec, UnresolvedSearchChainException> targets =
- getTargets(query.getModel().getSources(), query.properties(), execution.context().getIndexFacts());
- warnIfUnresolvedSearchChains(targets.errors(), mergedResults.hits());
-
- Collection<SearchChainInvocationSpec> prunedTargets =
- pruneTargetsWithoutDocumentTypes(query.getModel().getRestrict(), targets.data());
-
- Results<TargetHandler, ErrorMessage> regularTargetHandlers = resolveSearchChains(prunedTargets, execution.searchChainRegistry());
- query.errors().addAll(regularTargetHandlers.errors());
-
- Set<TargetHandler> targetHandlers = new LinkedHashSet<>(regularTargetHandlers.data());
- targetHandlers.addAll(getAdditionalTargets(query, execution, targetSelector));
-
- long targetsTimeout = calculateTimeout(query, targetHandlers);
- if (targetsTimeout < 0)
- return new Result(query, ErrorMessage.createTimeout("Timed out when about to federate"));
-
- traceTargets(query, targetHandlers);
-
- if (targetHandlers.size() == 0) {
- return mergedResults;
- } else if (targetHandlers.size() == 1 &&
- ! shouldExecuteTargetLongerThanThread(query, targetHandlers.iterator().next())) {
- TargetHandler chain = first(targetHandlers);
- searchSingleTarget(query, mergedResults, chain, targetsTimeout, execution);
- } else {
- searchMultipleTargets(query, mergedResults, targetHandlers, targetsTimeout, execution);
- }
-
- return mergedResults;
- }
-
- private void traceTargets(Query query, Collection<TargetHandler> targetHandlers) {
+ private void traceTargets(Query query, Collection<Target> targets) {
int traceFederationLevel = 2;
if ( ! query.isTraceable(traceFederationLevel)) return;
- query.trace("Federating to " + targetHandlers, traceFederationLevel);
+ query.trace("Federating to " + targets, traceFederationLevel);
}
/**
* Returns true if we are requested to keep executing a target longer than we're waiting for it.
* This is useful to populate caches inside targets.
*/
- private boolean shouldExecuteTargetLongerThanThread(Query query, TargetHandler target) {
+ private boolean shouldExecuteTargetLongerThanThread(Query query, Target target) {
return target.federationOptions().getRequestTimeoutInMilliseconds() > query.getTimeout();
}
- private static Results<TargetHandler, ErrorMessage> resolveSearchChains(
- Collection<SearchChainInvocationSpec> prunedTargets,
- SearchChainRegistry registry) {
+ private void addSearchChainTimedOutError(Query query, ComponentId searchChainId) {
+ ErrorMessage timeoutMessage = ErrorMessage.createTimeout("The search chain '" + searchChainId + "' timed out.");
+ timeoutMessage.setSource(searchChainId.stringValue());
+ query.errors().add(timeoutMessage);
+ }
- Results.Builder<TargetHandler, ErrorMessage> targetHandlers = new Results.Builder<>();
+ private void mergeResult(Query query, Target target, Result mergedResults, Result result) {
+ target.modifyTargetResult(result);
+ ComponentId searchChainId = target.getId();
+ Chain<Searcher> searchChain = target.getChain();
+
+ mergedResults.mergeWith(result);
+ HitGroup group = result.hits();
+ group.setId("source:" + searchChainId.getName());
+
+ group.setSearcherSpecificMetaData(this, searchChain);
+ group.setMeta(false); // Set hit groups as non-meta as a default
+ group.setAuxiliary(true); // Set hit group as auxiliary so that it doesn't contribute to count
+ group.setSource(searchChainId.getName());
+ group.setQuery(result.getQuery());
+
+ for (Iterator<Hit> it = group.unorderedDeepIterator(); it.hasNext();) {
+ Hit hit = it.next();
+ hit.setSearcherSpecificMetaData(this, searchChain);
+ hit.setSource(searchChainId.stringValue());
+
+ // This is the backend request meta hit, that is holding logging information
+ // See HTTPBackendSearcher, where this hit is created
+ if (hit.isMeta() && hit.types().contains("logging")) {
+ // Augment this hit with count fields
+ hit.setField(LOG_COUNT_PREFIX + "deep", result.getDeepHitCount());
+ hit.setField(LOG_COUNT_PREFIX + "total", result.getTotalHitCount());
+ int offset = result.getQuery().getOffset();
+ hit.setField(LOG_COUNT_PREFIX + "first", offset + 1);
+ hit.setField(LOG_COUNT_PREFIX + "last", result.getConcreteHitCount() + offset);
+ }
+
+ }
+ if (query.getTraceLevel()>=4)
+ query.trace("Got " + group.getConcreteSize() + " hits from " + group.getId(),false, 4);
+ mergedResults.hits().add(group);
+ }
+
+ private Results<Target, ErrorMessage> resolveSearchChains(Collection<SearchChainInvocationSpec> prunedTargets,
+ SearchChainRegistry registry) {
+ Results.Builder<Target, ErrorMessage> targetHandlers = new Results.Builder<>();
for (SearchChainInvocationSpec target: prunedTargets) {
Chain<Searcher> chain = registry.getChain(target.searchChainId);
@@ -932,19 +577,19 @@ public class FederationSearcher extends ForkingSearcher {
targetHandlers.addError(ErrorMessage.createIllegalQuery("Could not find search chain '"
+ target.searchChainId + "'"));
} else {
- targetHandlers.addData(new StandardTargetHandler(target, chain));
+ targetHandlers.addData(new StandardTarget(target, chain));
}
}
return targetHandlers.build();
}
- private static <T> List<TargetHandler> getAdditionalTargets(Query query, Execution execution, TargetSelector<T> targetSelector) {
+ private static <T> List<Target> getAdditionalTargets(Query query, Execution execution, TargetSelector<T> targetSelector) {
if (targetSelector == null) return Collections.emptyList();
- ArrayList<TargetHandler> result = new ArrayList<>();
+ ArrayList<Target> result = new ArrayList<>();
for (FederationTarget<T> target: targetSelector.getTargets(query, execution.searchChainRegistry()))
- result.add(new CustomTargetHandler<>(targetSelector, target));
+ result.add(new CustomTarget<>(targetSelector, target));
return result;
}
@@ -971,4 +616,227 @@ public class FederationSearcher extends ForkingSearcher {
return false;
}
+ /** A map from a unique search chain and query instance to a result */
+ private static class UniqueExecutionsToResults {
+
+ /** Implemented as a nested identity hashmap */
+ final Map<Chain<Searcher>,Map<Query,Result>> resultsToFill = new IdentityHashMap<>();
+
+ /** Returns a result to fill for a query and chain, by creating it if necessary */
+ public Result get(Chain<Searcher> chain, Query query) {
+ Map<Query,Result> resultsToFillForAChain = resultsToFill.get(chain);
+ if (resultsToFillForAChain == null) {
+ resultsToFillForAChain = new IdentityHashMap<>();
+ resultsToFill.put(chain,resultsToFillForAChain);
+ }
+
+ Result resultsToFillForAChainAndQuery = resultsToFillForAChain.get(query);
+ if (resultsToFillForAChainAndQuery == null) {
+ resultsToFillForAChainAndQuery = new Result(query);
+ resultsToFillForAChain.put(query,resultsToFillForAChainAndQuery);
+ }
+
+ return resultsToFillForAChainAndQuery;
+ }
+
+ }
+
+ /** A target for federation, containing a chain to which a federation query can be forwarded. */
+ static abstract class Target {
+
+ abstract Chain<Searcher> getChain();
+ abstract void modifyTargetQuery(Query query);
+ abstract void modifyTargetResult(Result result);
+
+ ComponentId getId() {
+ return getChain().getId();
+ }
+
+ public abstract FederationOptions federationOptions();
+
+ @Override
+ public String toString() { return getChain().getId().stringValue(); }
+
+ }
+
+ /**
+ * A handler representing a target created by the federation logic.
+ * This is a value object, to ensure that identical target invocations are not invoked multiple times.
+ */
+ private static class StandardTarget extends Target {
+
+ private final SearchChainInvocationSpec target;
+ private final Chain<Searcher> chain;
+
+ public StandardTarget(SearchChainInvocationSpec target, Chain<Searcher> chain) {
+ this.target = target;
+ this.chain = chain;
+ }
+
+ @Override
+ Chain<Searcher> getChain() { return chain; }
+
+ @Override
+ void modifyTargetQuery(Query query) {}
+ @Override
+ void modifyTargetResult(Result result) {}
+
+ @Override
+ public FederationOptions federationOptions() { return target.federationOptions; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if ( ! ( o instanceof StandardTarget)) return false;
+
+ StandardTarget other = (StandardTarget)o;
+ if ( ! Objects.equals(other.chain.getId(), this.chain.getId())) return false;
+ if ( ! Objects.equals(other.target, this.target)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() { return Objects.hash(chain.getId(), target); }
+
+ }
+
+ /** A target handler where the target generation logic is delegated to the application provided target selector */
+ private static class CustomTarget<T> extends Target {
+
+ private final TargetSelector<T> selector;
+ private final FederationTarget<T> target;
+
+ CustomTarget(TargetSelector<T> selector, FederationTarget<T> target) {
+ this.selector = selector;
+ this.target = target;
+ }
+
+ @Override
+ Chain<Searcher> getChain() {
+ return target.getChain();
+ }
+
+ @Override
+ public void modifyTargetQuery(Query query) {
+ selector.modifyTargetQuery(target, query);
+ }
+
+ @Override
+ public void modifyTargetResult(Result result) {
+ selector.modifyTargetResult(target, result);
+ }
+
+ @Override
+ public FederationOptions federationOptions() {
+ return target.getFederationOptions();
+ }
+
+ }
+
+ private static class CompoundKey {
+
+ private final String sourceName;
+ private final String propertyName;
+
+ CompoundKey(String sourceName, String propertyName) {
+ this.sourceName = sourceName;
+ this.propertyName = propertyName;
+ }
+
+ @Override
+ public int hashCode() {
+ return sourceName.hashCode() ^ propertyName.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ CompoundKey rhs = (CompoundKey) o;
+ return sourceName.equals(rhs.sourceName) && propertyName.equals(rhs.propertyName);
+ }
+
+ @Override
+ public String toString() {
+ return sourceName + '.' + propertyName;
+ }
+ }
+
+ private static class SourceKey extends CompoundKey {
+
+ public static final String SOURCE = "source.";
+
+ SourceKey(String sourceName, String propertyName) {
+ super(sourceName, propertyName);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() ^ 7;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof SourceKey) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return SOURCE + super.toString();
+ }
+ }
+
+ private static class ProviderKey extends CompoundKey {
+
+ public static final String PROVIDER = "provider.";
+
+ ProviderKey(String sourceName, String propertyName) {
+ super(sourceName, propertyName);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() ^ 17;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ProviderKey) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return PROVIDER + super.toString();
+ }
+
+ }
+
+ private static class Window {
+
+ private final int hits;
+ private final int offset;
+
+ public Window(int hits, int offset) {
+ this.hits = hits;
+ this.offset = offset;
+ }
+
+ public Integer get(CompoundName parameterName) {
+ if (parameterName.equals(Query.HITS)) return hits;
+ if (parameterName.equals(Query.OFFSET)) return offset;
+ return null;
+ }
+
+ public static Window from(Query query) {
+ return new Window(query.getHits(), query.getOffset());
+ }
+
+
+ public static Window from(Collection<Target> targets, Query query) {
+ if (targets.size() == 1) // preserve requested top-level offsets
+ return Window.from(query);
+ else // request from offset 0 to enable correct upstream blending into a single top-level hit list
+ return new Window(query.getHits() + query.getOffset(), 0);
+ }
+
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java
index 7e8f3553cab..0c70abbf570 100644
--- a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java
@@ -21,7 +21,7 @@ import com.yahoo.search.searchchain.Execution;
*
* @see FederationSearcher
* @since 5.0.13
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author Steinar Knutsen
*/
@After("*")
public class ForwardingSearcher extends PingableSearcher {
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java b/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java
deleted file mode 100644
index 52cd5397489..00000000000
--- a/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.federation;
-
-import com.yahoo.search.searchchain.FutureResult;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author tonytv
- */
-class FutureWaiter {
- private class Future {
- final FutureResult result;
- final long timeoutInMilliseconds;
-
- public Future(FutureResult result, long timeoutInMilliseconds) {
- this.result = result;
- this.timeoutInMilliseconds = timeoutInMilliseconds;
- }
- }
-
- private List<Future> futures = new ArrayList<>();
-
- public void add(FutureResult futureResult, long timeoutInMilliseconds) {
- futures.add(new Future(futureResult, timeoutInMilliseconds));
- }
-
- public void waitForFutures() {
- sortFuturesByTimeoutDescending();
-
- final long startTime = System.currentTimeMillis();
-
- for (Future future : futures) {
- long timeToWait = startTime + future.timeoutInMilliseconds - System.currentTimeMillis();
- if (timeToWait <= 0)
- break;
-
- future.result.get(timeToWait, TimeUnit.MILLISECONDS);
- }
- }
-
- private void sortFuturesByTimeoutDescending() {
- Collections.sort(futures, new Comparator<Future>() {
- @Override
- public int compare(Future lhs, Future rhs) {
- return -compareLongs(lhs.timeoutInMilliseconds, rhs.timeoutInMilliseconds);
- }
-
- private int compareLongs(long lhs, long rhs) {
- return new Long(lhs).compareTo(rhs);
- }
- });
- }
-}
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java
index e1794a73a93..739337add14 100644
--- a/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java
+++ b/container-search/src/main/java/com/yahoo/search/searchchain/AsyncExecution.java
@@ -40,7 +40,7 @@ import java.util.concurrent.*;
* </p>
*
* @see com.yahoo.search.searchchain.Execution
- * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author Arne Bergene Fossaa
*/
public class AsyncExecution {
@@ -174,9 +174,8 @@ public class AsyncExecution {
* collection
*/
public static List<Result> waitForAll(Collection<FutureResult> tasks, long timeoutMs) {
-
// Copy the list in case it is modified while we are waiting
- final List<FutureResult> workingTasks = new ArrayList<>(tasks);
+ List<FutureResult> workingTasks = new ArrayList<>(tasks);
try {
runTask(() -> {
for (FutureResult task : workingTasks)
@@ -186,15 +185,13 @@ public class AsyncExecution {
// Handle timeouts below
}
- final List<Result> results = new ArrayList<>(tasks.size());
+ List<Result> results = new ArrayList<>(tasks.size());
for (FutureResult atask : workingTasks) {
Result result;
if (atask.isDone() && !atask.isCancelled()) {
- result = atask.get(); // Since isDone() = true, this won't
- // block.
+ result = atask.get(); // Since isDone() = true, this won't block.
} else { // Not done and no errors thrown
- result = new Result(atask.getQuery(),
- atask.createTimeoutError());
+ result = new Result(atask.getQuery(), atask.createTimeoutError());
}
results.add(result);
}
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java
index 877252f07e6..ec87305da3b 100644
--- a/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java
+++ b/container-search/src/main/java/com/yahoo/search/searchchain/FutureResult.java
@@ -1,6 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.searchchain;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@@ -16,6 +17,8 @@ import com.yahoo.search.result.ErrorMessage;
/**
* Extends a {@code FutureTask<Result>}, with some added error handling
+ *
+ * @author bratseth
*/
public class FutureResult extends FutureTask<Result> {
@@ -26,51 +29,56 @@ public class FutureResult extends FutureTask<Result> {
private final static Logger log = Logger.getLogger(FutureResult.class.getName());
- FutureResult(Callable<Result> callable, Execution execution, final Query query) {
+ protected FutureResult(Callable<Result> callable, Execution execution, Query query) {
super(callable);
this.query = query;
this.execution = execution;
}
+ /**
+ * Returns a Result containing the hits returned from this source, or an error otherwise.
+ * This will block for however long it takes to get the result: Using this is a bad idea.
+ */
@Override
public Result get() {
- Result result;
try {
- result = super.get();
+ return super.get();
}
catch (InterruptedException e) {
- result = new Result(getQuery(), ErrorMessage.createUnspecifiedError(
- "'" + execution + "' was interrupted while executing: " + Exceptions.toMessageString(e)));
+ return new Result(getQuery(), createInterruptedError(e));
}
catch (ExecutionException e) {
- log.log(Level.WARNING,"Exception on executing " + execution + " for " + query,e);
- result = new Result(getQuery(), ErrorMessage.createErrorInPluginSearcher(
- "Error in '" + execution + "': " + Exceptions.toMessageString(e),
- e.getCause()));
+ return new Result(getQuery(), createExecutionError(e));
}
- return result;
}
+ /**
+ * Returns a Result containing the hits returned from this source, or an error otherwise.
+ * This blocks for at most the given timeout and returns a Result containing a timeout error
+ * if the result is not available within this time.
+ */
@Override
public Result get(long timeout, TimeUnit timeunit) {
- Result result;
+ return getIfAvailable(timeout, timeunit).orElse(new Result(getQuery(), createTimeoutError()));
+ }
+
+ /**
+ * Same as get(timeout, timeunit) but returns Optiona.empty instead of a result with error if the result is
+ * not available in time
+ */
+ public Optional<Result> getIfAvailable(long timeout, TimeUnit timeunit) {
try {
- result = super.get(timeout, timeunit);
+ return Optional.of(super.get(timeout, timeunit));
}
catch (InterruptedException e) {
- result = new Result(getQuery(), ErrorMessage.createUnspecifiedError(
- "'" + execution + "' was interrupted while executing: " + Exceptions.toMessageString(e)));
+ return Optional.of(new Result(getQuery(), createInterruptedError(e)));
}
catch (ExecutionException e) {
- log.log(Level.WARNING,"Exception on executing " + execution + " for " + query, e);
- result = new Result(getQuery(), ErrorMessage.createErrorInPluginSearcher(
- "Error in '" + execution + "': " + Exceptions.toMessageString(e),
- e.getCause()));
+ return Optional.of(new Result(getQuery(), createExecutionError(e)));
}
catch (TimeoutException e) {
- result = new Result(getQuery(), createTimeoutError());
+ return Optional.empty();
}
- return result;
}
/** Returns the query used in this execution, never null */
@@ -78,9 +86,19 @@ public class FutureResult extends FutureTask<Result> {
return query;
}
- ErrorMessage createTimeoutError() {
- return ErrorMessage.createTimeout(
- "Error executing '" + execution + "': " + " Chain timed out.");
+ private ErrorMessage createInterruptedError(Exception e) {
+ return ErrorMessage.createUnspecifiedError("'" + execution + "' was interrupted while executing: " +
+ Exceptions.toMessageString(e));
+ }
+
+ private ErrorMessage createExecutionError(Exception e) {
+ log.log(Level.WARNING,"Exception on executing " + execution + " for " + query,e);
+ return ErrorMessage.createErrorInPluginSearcher("Error in '" + execution + "': " + Exceptions.toMessageString(e),
+ e.getCause());
+ }
+ ErrorMessage createTimeoutError() {
+ return ErrorMessage.createTimeout("Error executing '" + execution + "': " + " Chain timed out.");
}
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java b/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java
index 73e35046dd4..67753026f44 100644
--- a/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java
+++ b/container-search/src/main/java/com/yahoo/search/searchchain/model/federation/FederationOptions.java
@@ -83,9 +83,7 @@ public class FederationOptions implements Cloneable {
}
public long getSearchChainExecutionTimeoutInMilliseconds(long queryTimeout) {
- return getTimeoutInMilliseconds() >= 0 ?
- getTimeoutInMilliseconds() :
- queryTimeout;
+ return getTimeoutInMilliseconds() >= 0 ? getTimeoutInMilliseconds() : queryTimeout;
}
public boolean getUseByDefault() {
diff --git a/container-search/src/test/java/com/yahoo/search/federation/FederationResultTest.java b/container-search/src/test/java/com/yahoo/search/federation/FederationResultTest.java
new file mode 100644
index 00000000000..eb17cf27db9
--- /dev/null
+++ b/container-search/src/test/java/com/yahoo/search/federation/FederationResultTest.java
@@ -0,0 +1,146 @@
+package com.yahoo.search.federation;
+
+import com.google.common.collect.ImmutableSet;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.Searcher;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.searchchain.Execution;
+import com.yahoo.search.searchchain.FutureResult;
+import com.yahoo.search.searchchain.model.federation.FederationOptions;
+import org.junit.Test;
+
+import java.time.Clock;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author bratseth
+ */
+public class FederationResultTest {
+
+ private static final FederationSearcher.Target organic = new MockTarget("organic", 250);
+ private static final FederationSearcher.Target dsp1 = new MockTarget("dsp1", 120);
+ private static final FederationSearcher.Target dsp2 = new MockTarget("dsp2", 100);
+
+ private final Clock clock = Clock.systemUTC();
+
+ @Test
+ public void testFederationResult() {
+ assertTimeout(ImmutableSet.of(), 50, 100, 90);
+ assertTimeout(ImmutableSet.of(), 240, 200, 200);
+ assertTimeout(ImmutableSet.of("dsp1"), 130, 140, 110);
+ assertTimeout(ImmutableSet.of("organic"), 260, 80, 80);
+ assertTimeout(ImmutableSet.of("dsp2"), 100, 110, 115);
+ assertTimeout(ImmutableSet.of(), 100, 110, 105);
+ assertTimeout(ImmutableSet.of("dsp1", "dsp2"), 100, 130, 130);
+ assertTimeout(ImmutableSet.of("organic"), 260, 130, 130);
+ }
+
+ private void assertTimeout(Set<String> expectedTimeoutNames, int ... responseTimes) {
+ FederationResult.Builder builder = new FederationResult.Builder();
+ builder.add(organic, resultAfter(responseTimes[0]));
+ builder.add(dsp1, resultAfter(responseTimes[1]));
+ builder.add(dsp2, resultAfter(responseTimes[2]));
+ FederationResult federationResult = builder.build();
+ federationResult.waitForAll(50, clock);
+ assertEquals(3, federationResult.all().size());
+ for (FederationResult.TargetResult targetResult : federationResult.all()) {
+ Result result = targetResult.getOrTimeoutError();
+ if (expectedTimeoutNames.contains(targetResult.target.getId().toString()))
+ assertTrue(targetResult.target.getId() + " timed out", timedOut(result));
+ else
+ assertTrue(targetResult.target.getId() + " did not time out", ! timedOut(result));
+ }
+ }
+
+ private MockFutureResult resultAfter(int time) {
+ return new MockFutureResult(new Query(), time);
+ }
+
+ private boolean timedOut(Result result) {
+ ErrorMessage error = result.hits().getError();
+ if (error == null) return false;
+ return error.getCode() == ErrorMessage.timeoutCode;
+ }
+
+ private class MockFutureResult extends FutureResult {
+
+ private final int responseTime;
+ private final Query query;
+ private final long startTime;
+
+ MockFutureResult(Query query, int responseTime) {
+ super(() -> new Result(query), new Execution(Execution.Context.createContextStub()), query);
+ this.responseTime = responseTime;
+ this.query = query;
+ startTime = clock.millis();
+ }
+
+ @Override
+ public Result get() { throw new RuntimeException(); }
+
+ @Override
+ public Optional<Result> getIfAvailable(long timeout, TimeUnit timeunit) {
+ if (timeunit != TimeUnit.MILLISECONDS) throw new RuntimeException();
+
+ long elapsedTime = clock.millis() - startTime;
+ long leftUntilResponse = responseTime - elapsedTime;
+ if (leftUntilResponse > timeout) {
+ sleepUntil(timeout);
+ return Optional.empty();
+ }
+ else {
+ sleepUntil(leftUntilResponse);
+ return Optional.of(new Result(query));
+ }
+ }
+
+ private void sleepUntil(long time) {
+ if (time <= 0) return;
+ try {
+ Thread.sleep(time);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+
+ @Override
+ public Query getQuery() {
+ return query;
+ }
+
+ }
+
+ private static class MockTarget extends FederationSearcher.Target {
+
+ private final Chain<Searcher> chain;
+ private final int timeout;
+
+ MockTarget(String id, int timeout) {
+ this.chain = new Chain<>(id);
+ this.timeout = timeout;
+ }
+
+ @Override
+ Chain<Searcher> getChain() { return chain; }
+
+ @Override
+ void modifyTargetQuery(Query query) { }
+
+ @Override
+ void modifyTargetResult(Result result) { }
+
+ @Override
+ public FederationOptions federationOptions() {
+ return new FederationOptions(false, timeout, true);
+ }
+
+ }
+
+}
diff --git a/container-search/src/test/java/com/yahoo/search/federation/FutureWaiterTest.java b/container-search/src/test/java/com/yahoo/search/federation/FutureWaiterTest.java
deleted file mode 100644
index 37969e12399..00000000000
--- a/container-search/src/test/java/com/yahoo/search/federation/FutureWaiterTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.federation;
-
-
-/**
- * @author tonytv
- */
-// TODO: Fix or remove!
-public class FutureWaiterTest {
-
-/*
-
- @MockClass(realClass = System.class)
- public static class MockSystem {
-
- private static long currentTime;
- private static boolean firstTime;
-
- private static final long startTime = 123;
-
- @Mock
- public static synchronized long currentTimeMillis() {
- if (firstTime) {
- firstTime = false;
- return startTime;
- }
- return currentTime;
- }
-
- static synchronized void setElapsedTime(long elapsedTime) {
- firstTime = true;
- currentTime = elapsedTime + startTime;
- }
- }
-
- @Mocked()
- FutureResult result1;
-
- @Mocked()
- FutureResult result2;
-
- @Mocked()
- FutureResult result3;
-
- @Mocked()
- FutureResult result4;
-
- @Before
- public void before() {
- Mockit.setUpMock(FutureWaiterTest.MockSystem.class);
- }
-
- @After
- public void after() {
- Mockit.tearDownMocks();
- }
-
- @Test
- public void require_time_to_wait_is_adjusted_for_elapsed_time() {
- MockSystem.setElapsedTime(300);
-
- FutureWaiter futureWaiter = new FutureWaiter();
- futureWaiter.add(result1, 350);
- futureWaiter.waitForFutures();
-
- new FullVerifications() {
- {
- result1.get(350 - 300, TimeUnit.MILLISECONDS);
- }
- };
- }
-
- @Test
- public void require_do_not_wait_for_expired_timeouts() {
- MockSystem.setElapsedTime(300);
-
- FutureWaiter futureWaiter = new FutureWaiter();
- futureWaiter.add(result1, 300);
- futureWaiter.add(result2, 290);
-
- futureWaiter.waitForFutures();
-
- new FullVerifications() {
- {}
- };
- }
-
- @Test
- public void require_wait_for_largest_timeout_first() throws InterruptedException {
- MockSystem.setElapsedTime(600);
-
- FutureWaiter futureWaiter = new FutureWaiter();
- futureWaiter.add(result1, 500);
- futureWaiter.add(result4, 800);
- futureWaiter.add(result2, 600);
- futureWaiter.add(result3, 700);
-
- futureWaiter.waitForFutures();
-
- new FullVerifications() {
- {
- result4.get(800 - 600, TimeUnit.MILLISECONDS);
- result3.get(700 - 600, TimeUnit.MILLISECONDS);
- }
- };
- }
-
- */
-}
diff --git a/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java b/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java
index e4176bb6679..11fae387739 100644
--- a/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java
+++ b/container-search/src/test/java/com/yahoo/search/federation/test/FederationSearcherTestCase.java
@@ -38,7 +38,7 @@ import static com.yahoo.search.federation.StrictContractsConfig.PropagateSourceP
* Test for federation searcher. The searcher is also tested in
* com.yahoo.prelude.searcher.test.BlendingSearcherTestCase.
*
- * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author Arne Bergene Fossaa
*/
@SuppressWarnings("deprecation")
public class FederationSearcherTestCase {
diff --git a/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java b/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java
index 7b0451a01ba..7aa73c64d3a 100644
--- a/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java
+++ b/container-search/src/test/java/com/yahoo/search/federation/test/FederationTester.java
@@ -18,6 +18,7 @@ import java.util.Collections;
* @author tonytv
*/
class FederationTester {
+
SearchChainResolver.Builder builder = new SearchChainResolver.Builder();
SearchChainRegistry registry = new SearchChainRegistry();