diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2017-01-24 15:19:59 +0100 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2017-01-24 15:19:59 +0100 |
commit | 827b409663ae7a0137d2e1f4c8659fcb3b15a1e5 (patch) | |
tree | 7804a1e593e433c547d46d46cd0867ba579d9d46 /container-search/src/main/java/com/yahoo/search/federation/FederationResult.java | |
parent | 6329a23f586f3a418702e6b2ca683957bc0ca554 (diff) |
Adaptive federation timeout
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/federation/FederationResult.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/federation/FederationResult.java | 131 |
1 files changed, 77 insertions, 54 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java index 8ea62c30692..85f1a185acf 100644 --- a/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java +++ b/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java @@ -1,71 +1,117 @@ package com.yahoo.search.federation; import com.google.common.collect.ImmutableList; +import com.yahoo.search.Result; import com.yahoo.search.searchchain.FutureResult; -import com.yahoo.search.searchchain.model.federation.FederationOptions; -import java.util.Collection; +import java.time.Clock; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** - * The result of a federation to targets which knowd how to wait for the result from each target. + * The result of a federation to targets which knows how to wait for the result from each target. + * This thread handles multiple threads producing target results but only a single thread may use an instance of this. * * @author bratseth */ class FederationResult { + /** All targets of this */ private final List<TargetResult> targetResults; + /** + * The remaining targets to wait for. + * Other targets are either complete, or should only be included if they are available when we complete + */ + private List<TargetResult> targetsToWaitFor; + private FederationResult(ImmutableList<TargetResult> targetResults) { this.targetResults = targetResults; + + if (targetResults.stream().anyMatch(TargetResult::isMandatory)) + targetsToWaitFor = targetResults.stream().filter(TargetResult::isMandatory).collect(Collectors.toList()); + else + targetsToWaitFor = new ArrayList<>(targetResults); } - - public void waitForAll(long queryTimeout) { - waitForMandatoryTargets(queryTimeout); // For now ... + + /** + * Wait on each target for that targets timeout + * On the worst case this is the same as waiting for the max target timeout, + * in the average case it may be much better because lower timeout sources do not get to + * drive the timeout above their own timeout value. + * When this completes, results can be accessed from the TargetResults with no blocking + * (i.e getOrTimeout) without breaking any contract. + */ + public void waitForAll(int queryTimeout, Clock clock) { + long startTime = clock.millis(); + while ( ! targetsToWaitFor.isEmpty()) { + TargetResult nextToWaitFor = targetWithSmallestTimeout(targetsToWaitFor, queryTimeout); + long timeLeftOfNextTimeout = nextToWaitFor.timeout(queryTimeout) - ( clock.millis() - startTime ); + nextToWaitFor.getIfAvailable(timeLeftOfNextTimeout); + targetsToWaitFor.remove(nextToWaitFor); + } } /** Returns an immutable list of the results of this */ public List<TargetResult> all() { return targetResults; } - - private void waitForMandatoryTargets(long queryTimeout) { - FutureWaiter futureWaiter = new FutureWaiter(); - - boolean hasMandatoryTargets = false; - for (TargetResult targetResult : targetResults) { - if (targetResult.isMandatory()) { - futureWaiter.add(targetResult.futureResult, targetResult.getSearchChainExecutionTimeoutMs(queryTimeout)); - hasMandatoryTargets = true; - } - } - if ( ! hasMandatoryTargets) { - for (TargetResult targetResult : targetResults) { - futureWaiter.add(targetResult.futureResult, - targetResult.getSearchChainExecutionTimeoutMs(queryTimeout)); - } - } - - futureWaiter.waitForFutures(); + private TargetResult targetWithSmallestTimeout(List<TargetResult> results, int queryTimeout) { + TargetResult smallest = null; + for (TargetResult result : results) + if (smallest == null || result.timeout(queryTimeout) < smallest.timeout(queryTimeout)) + smallest = result; + return smallest; } - + static class TargetResult { final FederationSearcher.Target target; - final FutureResult futureResult; + private final FutureResult futureResult; + + /** + * Single threaded access to result already returned from futureResult, if any. + * To avoid unnecessary synchronization with the producer thread. + */ + private Optional<Result> availableResult = Optional.empty(); private TargetResult(FederationSearcher.Target target, FutureResult futureResult) { this.target = target; this.futureResult = futureResult; } + private boolean isMandatory() { return ! target.federationOptions().getOptional(); } + + /** + * Returns the result of this by blocking until timeout if necessary. + * + * @return the result if available, or null otherwise + */ + public Optional<Result> getIfAvailable(long timeout) { + if (availableResult.isPresent()) return availableResult; + availableResult = futureResult.getIfAvailable(timeout, TimeUnit.MILLISECONDS); + return availableResult; + } + + /** Returns a result without blocking; if the result is not available one with a timeout error is produced */ + public Result getOrTimeoutError() { + // The else part is to offload creation of the timeout error + return getIfAvailable(0).orElse(futureResult.get(0, TimeUnit.MILLISECONDS)); + } + public boolean successfullyCompleted() { return futureResult.isDone() && ! futureResult.isCancelled(); } - private boolean isMandatory() { return ! target.federationOptions().getOptional(); } - - private long getSearchChainExecutionTimeoutMs(long queryTimeout) { - return target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout); + private int timeout(long queryTimeout) { + return (int)target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout); + } + + @Override + public String toString() { + return "result for " + target; } } @@ -84,27 +130,4 @@ class FederationResult { } - /** Returns the max mandatory timeout, or 0 if there are no mandatory sources */ - /* - private long calculateMandatoryTimeout(Query query, Collection<FederationSearcher.Target> targets) { - long mandatoryTimeout = 0; - long queryTimeout = query.getTimeout(); - for (FederationSearcher.Target target : targets) { - if (target.federationOptions().getOptional()) continue; - mandatoryTimeout = Math.min(mandatoryTimeout, - target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout)); - } - return mandatoryTimeout; - } - - if (query.requestHasProperty("timeout")) { - mandatoryTimeout = query.getTimeLeft(); - } else { - mandatoryTimeout = calculateMandatoryTimeout(query, targetHandlers); - } - - if (mandatoryTimeout < 0) - return new Result(query, ErrorMessage.createTimeout("Timed out when about to federate")); - - */ } |