aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/federation/FederationResult.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/FederationResult.java131
1 files changed, 77 insertions, 54 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java
index 8ea62c30692..85f1a185acf 100644
--- a/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java
+++ b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java
@@ -1,71 +1,117 @@
package com.yahoo.search.federation;
import com.google.common.collect.ImmutableList;
+import com.yahoo.search.Result;
import com.yahoo.search.searchchain.FutureResult;
-import com.yahoo.search.searchchain.model.federation.FederationOptions;
-import java.util.Collection;
+import java.time.Clock;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
- * The result of a federation to targets which knowd how to wait for the result from each target.
+ * The result of a federation to targets which knows how to wait for the result from each target.
+ * This thread handles multiple threads producing target results but only a single thread may use an instance of this.
*
* @author bratseth
*/
class FederationResult {
+ /** All targets of this */
private final List<TargetResult> targetResults;
+ /**
+ * The remaining targets to wait for.
+ * Other targets are either complete, or should only be included if they are available when we complete
+ */
+ private List<TargetResult> targetsToWaitFor;
+
private FederationResult(ImmutableList<TargetResult> targetResults) {
this.targetResults = targetResults;
+
+ if (targetResults.stream().anyMatch(TargetResult::isMandatory))
+ targetsToWaitFor = targetResults.stream().filter(TargetResult::isMandatory).collect(Collectors.toList());
+ else
+ targetsToWaitFor = new ArrayList<>(targetResults);
}
-
- public void waitForAll(long queryTimeout) {
- waitForMandatoryTargets(queryTimeout); // For now ...
+
+ /**
+ * Wait on each target for that targets timeout
+ * On the worst case this is the same as waiting for the max target timeout,
+ * in the average case it may be much better because lower timeout sources do not get to
+ * drive the timeout above their own timeout value.
+ * When this completes, results can be accessed from the TargetResults with no blocking
+ * (i.e getOrTimeout) without breaking any contract.
+ */
+ public void waitForAll(int queryTimeout, Clock clock) {
+ long startTime = clock.millis();
+ while ( ! targetsToWaitFor.isEmpty()) {
+ TargetResult nextToWaitFor = targetWithSmallestTimeout(targetsToWaitFor, queryTimeout);
+ long timeLeftOfNextTimeout = nextToWaitFor.timeout(queryTimeout) - ( clock.millis() - startTime );
+ nextToWaitFor.getIfAvailable(timeLeftOfNextTimeout);
+ targetsToWaitFor.remove(nextToWaitFor);
+ }
}
/** Returns an immutable list of the results of this */
public List<TargetResult> all() { return targetResults; }
-
- private void waitForMandatoryTargets(long queryTimeout) {
- FutureWaiter futureWaiter = new FutureWaiter();
-
- boolean hasMandatoryTargets = false;
- for (TargetResult targetResult : targetResults) {
- if (targetResult.isMandatory()) {
- futureWaiter.add(targetResult.futureResult, targetResult.getSearchChainExecutionTimeoutMs(queryTimeout));
- hasMandatoryTargets = true;
- }
- }
- if ( ! hasMandatoryTargets) {
- for (TargetResult targetResult : targetResults) {
- futureWaiter.add(targetResult.futureResult,
- targetResult.getSearchChainExecutionTimeoutMs(queryTimeout));
- }
- }
-
- futureWaiter.waitForFutures();
+ private TargetResult targetWithSmallestTimeout(List<TargetResult> results, int queryTimeout) {
+ TargetResult smallest = null;
+ for (TargetResult result : results)
+ if (smallest == null || result.timeout(queryTimeout) < smallest.timeout(queryTimeout))
+ smallest = result;
+ return smallest;
}
-
+
static class TargetResult {
final FederationSearcher.Target target;
- final FutureResult futureResult;
+ private final FutureResult futureResult;
+
+ /**
+ * Single threaded access to result already returned from futureResult, if any.
+ * To avoid unnecessary synchronization with the producer thread.
+ */
+ private Optional<Result> availableResult = Optional.empty();
private TargetResult(FederationSearcher.Target target, FutureResult futureResult) {
this.target = target;
this.futureResult = futureResult;
}
+ private boolean isMandatory() { return ! target.federationOptions().getOptional(); }
+
+ /**
+ * Returns the result of this by blocking until timeout if necessary.
+ *
+ * @return the result if available, or null otherwise
+ */
+ public Optional<Result> getIfAvailable(long timeout) {
+ if (availableResult.isPresent()) return availableResult;
+ availableResult = futureResult.getIfAvailable(timeout, TimeUnit.MILLISECONDS);
+ return availableResult;
+ }
+
+ /** Returns a result without blocking; if the result is not available one with a timeout error is produced */
+ public Result getOrTimeoutError() {
+ // The else part is to offload creation of the timeout error
+ return getIfAvailable(0).orElse(futureResult.get(0, TimeUnit.MILLISECONDS));
+ }
+
public boolean successfullyCompleted() {
return futureResult.isDone() && ! futureResult.isCancelled();
}
- private boolean isMandatory() { return ! target.federationOptions().getOptional(); }
-
- private long getSearchChainExecutionTimeoutMs(long queryTimeout) {
- return target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout);
+ private int timeout(long queryTimeout) {
+ return (int)target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout);
+ }
+
+ @Override
+ public String toString() {
+ return "result for " + target;
}
}
@@ -84,27 +130,4 @@ class FederationResult {
}
- /** Returns the max mandatory timeout, or 0 if there are no mandatory sources */
- /*
- private long calculateMandatoryTimeout(Query query, Collection<FederationSearcher.Target> targets) {
- long mandatoryTimeout = 0;
- long queryTimeout = query.getTimeout();
- for (FederationSearcher.Target target : targets) {
- if (target.federationOptions().getOptional()) continue;
- mandatoryTimeout = Math.min(mandatoryTimeout,
- target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout));
- }
- return mandatoryTimeout;
- }
-
- if (query.requestHasProperty("timeout")) {
- mandatoryTimeout = query.getTimeLeft();
- } else {
- mandatoryTimeout = calculateMandatoryTimeout(query, targetHandlers);
- }
-
- if (mandatoryTimeout < 0)
- return new Result(query, ErrorMessage.createTimeout("Timed out when about to federate"));
-
- */
}