aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/federation/FederationResult.java
blob: 89c45fde6ae6026b05f514c4109585302616c346 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.federation;

import com.yahoo.search.Result;
import com.yahoo.search.searchchain.FutureResult;

import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * The result of a federation to targets which knows how to wait for the result from each target.
 * This thread handles multiple threads producing target results but only a single thread may use an instance of this.
 *
 * @author bratseth
 */
class FederationResult {

    /** All targets of this */
    private final List<TargetResult> targetResults;
    
    /** 
     * The remaining targets to wait for. 
     * Other targets are either complete, or should only be included if they are available when we complete
     */
    private final List<TargetResult> targetsToWaitFor;
    
    private FederationResult(List<TargetResult> targetResults) {
        this.targetResults = targetResults;

        if (targetResults.stream().anyMatch(TargetResult::isMandatory))
            targetsToWaitFor = targetResults.stream().filter(TargetResult::isMandatory).collect(Collectors.toList());
        else
            targetsToWaitFor = new ArrayList<>(targetResults);
    }

    /**
     * Wait on each target for that targets timeout.
     * In the worst case this is the same as waiting for the max target timeout,
     * in the average case it may be much better because lower timeout sources do not get to
     * drive the timeout above their own timeout value.
     * When this completes, results can be accessed from the TargetResults with no blocking
     * (i.e getOrTimeout) without breaking any contract.
     */
    public void waitForAll(int queryTimeout, Clock clock) {
        long startTime = clock.millis();
        while ( ! targetsToWaitFor.isEmpty()) {
            TargetResult nextToWaitFor = targetWithSmallestTimeout(targetsToWaitFor, queryTimeout);
            long timeLeftOfNextTimeout = nextToWaitFor.timeout(queryTimeout) - ( clock.millis() - startTime );
            nextToWaitFor.getIfAvailable(timeLeftOfNextTimeout);
            targetsToWaitFor.remove(nextToWaitFor);
        }
    }
    
    /** Returns an immutable list of the results of this */
    public List<TargetResult> all() { return targetResults; }

    private TargetResult targetWithSmallestTimeout(List<TargetResult> results, int queryTimeout) {
        TargetResult smallest = null;
        for (TargetResult result : results) {
            if (smallest == null || result.timeout(queryTimeout) < smallest.timeout(queryTimeout))
                smallest = result;
        }
        return smallest;        
    }
    
    static class TargetResult {

        final FederationSearcher.Target target;
        private final FutureResult futureResult;

        /** 
         * Single threaded access to result already returned from futureResult, if any.
         * To avoid unnecessary synchronization with the producer thread.
         */
        private Optional<Result> availableResult = Optional.empty();

        private TargetResult(FederationSearcher.Target target, FutureResult futureResult) {
            this.target = target;
            this.futureResult = futureResult;
        }

        private boolean isMandatory() { return ! target.federationOptions().getOptional(); }

        /**
         * Returns the result of this by blocking until timeout if necessary. 
         * 
         * @return the result if available, or empty otherwise
         */
        public Optional<Result> getIfAvailable(long timeout) {
            if (availableResult.isPresent()) return availableResult;
            availableResult = futureResult.getIfAvailable(timeout, TimeUnit.MILLISECONDS);
            availableResult.ifPresent(target::modifyTargetResult);
            return availableResult;
        }
        
        /** Returns a result without blocking; if the result is not available one with a timeout error is produced */
        public Result getOrTimeoutError() {
            // The else part is to offload creation of the timeout error
            return getIfAvailable(0).orElse(futureResult.get(0, TimeUnit.MILLISECONDS));
        }
        
        public boolean successfullyCompleted() {
            return futureResult.isDone() && ! futureResult.isCancelled();
        }

        private int timeout(long queryTimeout) {
            return (int)target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout);
        }
        
        @Override
        public String toString() {
            return "result for " + target;
        }

    }

    public static class Builder {
        
        private final List<TargetResult> results = new ArrayList<>();
        
        public void add(FederationSearcher.Target target, FutureResult futureResult) {
            results.add(new TargetResult(target, futureResult));
        }
        
        public FederationResult build() {
            return new FederationResult(List.copyOf(results));
        }
        
    }
    
}