summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
blob: 22573fac2d94be6e68a8737a2786f7e4114e9f5b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;

import com.yahoo.search.Query;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * LoadBalancer determines which group of content nodes should be accessed next for each search query when the internal java dispatcher is
 * used.
 *
 * @author ollivir
 */
public class LoadBalancer {
    // The implementation here is a simplistic least queries in flight + round-robin load balancer

    private static final Logger log = Logger.getLogger(LoadBalancer.class.getName());

    private final List<GroupSchedule> scoreboard;
    private int needle = 0;

    public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) {
        if (searchCluster == null) {
            this.scoreboard = null;
            return;
        }
        this.scoreboard = new ArrayList<>(searchCluster.groups().size());

        for (Group group : searchCluster.orderedGroups()) {
            scoreboard.add(new GroupSchedule(group));
        }

        if(! roundRobin) {
            // TODO - More randomness could be desirable
            Collections.shuffle(scoreboard);
        }
    }

    /**
     * Select and allocate the search cluster group which is to be used for the provided query. Callers <b>must</b> call
     * {@link #releaseGroup} symmetrically for each taken allocation.
     *
     * @param query the query for which this allocation is made
     * @param rejectedGroups if not null, the load balancer will only return groups with IDs not in the set
     * @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used
     */
    public Optional<Group> takeGroupForQuery(Query query, Set<Integer> rejectedGroups) {
        if (scoreboard == null) {
            return Optional.empty();
        }

        return allocateNextGroup(rejectedGroups);
    }

    /**
     * Release an allocation given by {@link #takeGroupForQuery}. The release must be done exactly once for each allocation.
     *
     * @param group
     *            previously allocated group
     */
    public void releaseGroup(Group group) {
        synchronized (this) {
            for (GroupSchedule sched : scoreboard) {
                if (sched.group.id() == group.id()) {
                    sched.adjustScore(-1);
                    break;
                }
            }
        }
    }

    private Optional<Group> allocateNextGroup(Set<Integer> rejectedGroups) {
        synchronized (this) {
            GroupSchedule bestSchedule = null;
            int bestIndex = needle;

            int index = needle;
            for (int i = 0; i < scoreboard.size(); i++) {
                GroupSchedule sched = scoreboard.get(index);
                if (rejectedGroups == null || !rejectedGroups.contains(sched.group.id())) {
                    if (sched.isPreferredOver(bestSchedule)) {
                        bestSchedule = sched;
                        bestIndex = index;
                    }
                }
                index = nextScoreboardIndex(index);
            }
            needle = nextScoreboardIndex(bestIndex);

            Group ret = null;
            if (bestSchedule != null) {
                bestSchedule.adjustScore(1);
                ret = bestSchedule.group;
            }
            if (log.isLoggable(Level.FINE)) {
                log.fine("Offering <" + ret + "> for query connection");
            }
            return Optional.ofNullable(ret);
        }
    }

    private int nextScoreboardIndex(int current) {
        int next = current + 1;
        if (next >= scoreboard.size()) {
            next %= scoreboard.size();
        }
        return next;
    }

    private static class GroupSchedule {
        private final Group group;
        private int score;

        public GroupSchedule(Group group) {
            this.group = group;
            this.score = 0;
        }

        public boolean isPreferredOver(GroupSchedule other) {
            if (other == null) {
                return true;
            }

            // different coverage
            if (this.group.hasSufficientCoverage() != other.group.hasSufficientCoverage()) {
                if (! this.group.hasSufficientCoverage()) {
                    // this doesn't have coverage, other does
                    return false;
                } else {
                    // other doesn't have coverage, this does
                    return true;
                }
            }

            return this.score < other.score;
        }

        public void adjustScore(int amount) {
            this.score += amount;
            if (score < 0) {
                log.warning("Double free of query target group detected");
                score = 0;
            }
        }
    }
}