aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
blob: 05e1ea6e2f9e6ef8d499437b7853dd3b05464d1b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;

import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.logging.Logger;

/**
 * LoadBalancer determines which group of content nodes should be accessed next for each search query when the internal java dispatcher is
 * used.
 *
 * @author ollivir
 */
public class LoadBalancer {
    // The implementation here is a simplistic least queries in flight + round-robin load balancer

    private static final Logger log = Logger.getLogger(LoadBalancer.class.getName());

    private static final long DEFAULT_LATENCY_DECAY_RATE = 1000;
    private static final long MIN_LATENCY_DECAY_RATE = 42;
    private static final double INITIAL_QUERY_TIME = 0.001;
    private static final double MIN_QUERY_TIME = 0.001;

    private final List<GroupStatus> scoreboard;
    private final GroupScheduler scheduler;

    public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) {
        this.scoreboard = new ArrayList<>(searchCluster.groups().size());
        for (Group group : searchCluster.orderedGroups()) {
            scoreboard.add(new GroupStatus(group));
        }
        if (roundRobin || scoreboard.size() == 1) {
            this.scheduler = new RoundRobinScheduler(scoreboard);
        } else {
            this.scheduler = new AdaptiveScheduler(new Random(), scoreboard);
        }
    }

    /**
     * Select and allocate the search cluster group which is to be used for the next search query. Callers <b>must</b> call
     * {@link #releaseGroup} symmetrically for each taken allocation.
     *
     * @param rejectedGroups if not null, the load balancer will only return groups with IDs not in the set
     * @return the node group to target, or <i>empty</i> if the internal dispatch logic cannot be used
     */
    public Optional<Group> takeGroup(Set<Integer> rejectedGroups) {
        synchronized (this) {
            Optional<GroupStatus> best = scheduler.takeNextGroup(rejectedGroups);

            if (best.isPresent()) {
                GroupStatus gs = best.get();
                gs.allocate();
                Group ret = gs.group;
                log.fine(() -> "Offering <" + ret + "> for query connection");
                return Optional.of(ret);
            } else {
                return Optional.empty();
            }
        }
    }

    /**
     * Release an allocation given by {@link #takeGroup}. The release must be done exactly once for each allocation.
     *
     * @param group previously allocated group
     * @param success was the query successful
     * @param searchTimeMs query execution time in milliseconds, used for adaptive load balancing
     */
    public void releaseGroup(Group group, boolean success, double searchTimeMs) {
        synchronized (this) {
            for (GroupStatus sched : scoreboard) {
                if (sched.group.id() == group.id()) {
                    sched.release(success, (double) searchTimeMs / 1000.0);
                    break;
                }
            }
        }
    }

    static class GroupStatus {
        private final Group group;
        private int allocations = 0;
        private long queries = 0;
        private double averageSearchTime = INITIAL_QUERY_TIME;

        GroupStatus(Group group) {
            this.group = group;
        }

        void allocate() {
            allocations++;
        }

        void release(boolean success, double searchTime) {
            allocations--;
            if (allocations < 0) {
                log.warning("Double free of query target group detected");
                allocations = 0;
            }
            if (success) {
                searchTime = Math.max(searchTime, MIN_QUERY_TIME);
                double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE);
                averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate;
                queries++;
            }
        }

        double averageSearchTime() {
            return averageSearchTime;
        }

        double averageSearchTimeInverse() {
            return 1.0 / averageSearchTime;
        }

        int groupId() {
            return group.id();
        }

        void setQueryStatistics(long queries, double averageSearchTime) {
            this.queries = queries;
            this.averageSearchTime = averageSearchTime;
        }
    }

    private interface GroupScheduler {
        Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups);
    }

    private static class RoundRobinScheduler implements GroupScheduler {

        private int needle = 0;
        private final List<GroupStatus> scoreboard;

        public RoundRobinScheduler(List<GroupStatus> scoreboard) {
            this.scoreboard = scoreboard;
        }

        @Override
        public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) {
            GroupStatus bestCandidate = null;
            int bestIndex = needle;

            int index = needle;
            for (int i = 0; i < scoreboard.size(); i++) {
                GroupStatus candidate = scoreboard.get(index);
                if (rejectedGroups == null || !rejectedGroups.contains(candidate.group.id())) {
                    GroupStatus better = betterGroup(bestCandidate, candidate);
                    if (better == candidate) {
                        bestCandidate = candidate;
                        bestIndex = index;
                    }
                }
                index = nextScoreboardIndex(index);
            }
            needle = nextScoreboardIndex(bestIndex);
            return Optional.ofNullable(bestCandidate);
        }

        /**
         * Select the better of the two given GroupStatus objects, biased to the first
         * parameter. Thus, if all groups have equal coverage sufficiency, the one
         * currently at the needle will be used. Either parameter can be null, in which
         * case any non-null will be preferred.
         *
         * @param first preferred GroupStatus
         * @param second potentially better GroupStatus
         * @return the better of the two
         */
        private static GroupStatus betterGroup(GroupStatus first, GroupStatus second) {
            if (second == null) {
                return first;
            }
            if (first == null) {
                return second;
            }

            // different coverage
            if (first.group.hasSufficientCoverage() != second.group.hasSufficientCoverage()) {
                if (!first.group.hasSufficientCoverage()) {
                    // first doesn't have coverage, second does
                    return second;
                } else {
                    // second doesn't have coverage, first does
                    return first;
                }
            }

            return first;
        }

        private int nextScoreboardIndex(int current) {
            int next = current + 1;
            if (next >= scoreboard.size()) {
                next %= scoreboard.size();
            }
            return next;
        }
    }

    static class AdaptiveScheduler implements GroupScheduler {

        private final Random random;
        private final List<GroupStatus> scoreboard;

        public AdaptiveScheduler(Random random, List<GroupStatus> scoreboard) {
            this.random = random;
            this.scoreboard = scoreboard;
        }

        private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage, Set<Integer> rejected) {
            double sum = 0;
            int n = 0;
            for (GroupStatus gs : scoreboard) {
                if (rejected == null || !rejected.contains(gs.group.id())) {
                    if (!requireCoverage || gs.group.hasSufficientCoverage()) {
                        sum += gs.averageSearchTimeInverse();
                        n++;
                    }
                }
            }
            if (n == 0) {
                return Optional.empty();
            }
            double accum = 0;
            for (GroupStatus gs : scoreboard) {
                if (rejected == null || !rejected.contains(gs.group.id())) {
                    if (!requireCoverage || gs.group.hasSufficientCoverage()) {
                        accum += gs.averageSearchTimeInverse();
                        if (needle < accum / sum) {
                            return Optional.of(gs);
                        }
                    }
                }
            }
            return Optional.empty(); // should not happen here
        }

        @Override
        public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) {
            double needle = random.nextDouble();
            Optional<GroupStatus> gs = selectGroup(needle, true, rejectedGroups);
            if (gs.isPresent()) {
                return gs;
            }
            // fallback - any coverage better than none
            return selectGroup(needle, false, rejectedGroups);
        }
    }

}