aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
blob: a9b0632a767d4ecdf3f81ae0fb7980badb26d1ec (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;

import com.yahoo.jrt.slobrok.api.Mirror;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Load balances over a set of nodes based on statistics gathered from those nodes.
 *
 * @author thomasg
 */
public class LoadBalancer {

    static class NodeMetrics {
        long sent = 0;
        long busy = 0;
        double weight = 1.0;
    }

    static class Node {
        Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; }

        Mirror.Entry entry;
        NodeMetrics metrics;
    }

    /** Statistics on each node we are load balancing over. Populated lazily. */
    private final List<NodeMetrics> nodeWeights = new ArrayList<>();
    private final Map<String, Integer> cachedIndex = new HashMap<>();

    private final String cluster;
    private double position = 0.0;

    public LoadBalancer(String cluster) {
        this.cluster = cluster;
    }

    public List<NodeMetrics> getNodeWeights() {
        return nodeWeights;
    }

    /** Returns the index from a node name string */
    int getIndex(String nodeName) {
        try {
            String s = nodeName.substring(cluster.length() + 1);
            s = s.substring(0, s.indexOf("/"));
            s = s.substring(s.lastIndexOf(".") + 1);
            return Integer.parseInt(s);
        } catch (IndexOutOfBoundsException | NumberFormatException e) {
            String err = "Expected recipient on the form '" + cluster + "/x/[y.]number/z', got '" + nodeName + "'.";
            throw new IllegalArgumentException(err, e);
        }
    }
    private int getCachedIndex(String nodeName) {
        return cachedIndex.computeIfAbsent(nodeName, key -> getIndex(key));
    }

    /**
     * The load balancing operation: Returns a node choice from the given choices,
     * based on previously gathered statistics on the nodes, and a running "position"
     * which is increased by 1 on each call to this.
     *
     * @param choices the node choices, represented as Slobrok entries
     * @return the chosen node, or null only if the given choices were zero
     */
    public Node getRecipient(List<Mirror.Entry> choices) {
        if (choices.isEmpty()) return null;

        double weightSum = 0.0;
        Node selectedNode = null;
        synchronized (this) {
            for (Mirror.Entry entry : choices) {
                NodeMetrics nodeMetrics = getNodeMetrics(entry);

                weightSum += nodeMetrics.weight;

                if (weightSum > position) {
                    selectedNode = new Node(entry, nodeMetrics);
                    break;
                }
            }
            if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason)
                position -= weightSum;
                selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0)));
            }
            position += 1.0;
            selectedNode.metrics.sent++;
        }
        return selectedNode;
    }

    /**
     * Returns the node metrics at a given index.
     * If there is no entry at the given index it is created by this call.
     */
    private NodeMetrics getNodeMetrics(Mirror.Entry entry) {
        int index = getCachedIndex(entry.getName());
        // expand node array as needed
        while (nodeWeights.size() < (index + 1))
            nodeWeights.add(null);

        NodeMetrics nodeMetrics = nodeWeights.get(index);
        if (nodeMetrics == null) { // initialize statistics for this node
            nodeMetrics = new NodeMetrics();
            nodeWeights.set(index, nodeMetrics);
        }
        return nodeMetrics;
    }

    /** Scale weights such that ratios are preserved */
    private void increaseWeights() {
        for (NodeMetrics n : nodeWeights) {
            if (n == null) continue;
            double want = n.weight * 1.01010101010101010101;
            if (want >= 1.0) {
                n.weight = want;
            } else {
                n.weight = 1.0;
            }
        }
    }

    public void received(Node node, boolean busy) {
        if (busy) {
            synchronized (this) {
                double wantWeight = node.metrics.weight - 0.01;
                if (wantWeight < 1.0) {
                    increaseWeights();
                    node.metrics.weight = 1.0;
                } else {
                    node.metrics.weight = wantWeight;
                }
                node.metrics.busy++;
            }
        }
    }

}