aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java107
1 files changed, 24 insertions, 83 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
index a9b0632a767..c464bed5b87 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
@@ -1,4 +1,3 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.jrt.slobrok.api.Mirror;
@@ -7,20 +6,24 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
-/**
- * Load balances over a set of nodes based on statistics gathered from those nodes.
- *
- * @author thomasg
- */
-public class LoadBalancer {
-
+abstract class LoadBalancer {
static class NodeMetrics {
- long sent = 0;
- long busy = 0;
- double weight = 1.0;
+ private AtomicLong sent = new AtomicLong(0);
+ private AtomicLong received = new AtomicLong(0);
+ private AtomicLong busy = new AtomicLong(0);
+ long pending() { return sent.get() - received.get(); }
+ void incSend() { sent.incrementAndGet(); }
+ void incReceived() { received.incrementAndGet(); }
+ void incBusy() { busy.incrementAndGet(); }
+ long sent() { return sent.get(); }
+ void reset() {
+ sent.set(0);
+ received.set(0);
+ busy.set(0);
+ }
}
-
static class Node {
Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; }
@@ -28,21 +31,17 @@ public class LoadBalancer {
NodeMetrics metrics;
}
+ private final Map<String, Integer> cachedIndex = new HashMap<>();
/** Statistics on each node we are load balancing over. Populated lazily. */
private final List<NodeMetrics> nodeWeights = new ArrayList<>();
- private final Map<String, Integer> cachedIndex = new HashMap<>();
-
private final String cluster;
- private double position = 0.0;
public LoadBalancer(String cluster) {
this.cluster = cluster;
}
-
- public List<NodeMetrics> getNodeWeights() {
+ List<NodeMetrics> getNodeWeights() {
return nodeWeights;
}
-
/** Returns the index from a node name string */
int getIndex(String nodeName) {
try {
@@ -55,49 +54,14 @@ public class LoadBalancer {
throw new IllegalArgumentException(err, e);
}
}
- private int getCachedIndex(String nodeName) {
+ int getCachedIndex(String nodeName) {
return cachedIndex.computeIfAbsent(nodeName, key -> getIndex(key));
}
-
- /**
- * The load balancing operation: Returns a node choice from the given choices,
- * based on previously gathered statistics on the nodes, and a running "position"
- * which is increased by 1 on each call to this.
- *
- * @param choices the node choices, represented as Slobrok entries
- * @return the chosen node, or null only if the given choices were zero
- */
- public Node getRecipient(List<Mirror.Entry> choices) {
- if (choices.isEmpty()) return null;
-
- double weightSum = 0.0;
- Node selectedNode = null;
- synchronized (this) {
- for (Mirror.Entry entry : choices) {
- NodeMetrics nodeMetrics = getNodeMetrics(entry);
-
- weightSum += nodeMetrics.weight;
-
- if (weightSum > position) {
- selectedNode = new Node(entry, nodeMetrics);
- break;
- }
- }
- if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason)
- position -= weightSum;
- selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0)));
- }
- position += 1.0;
- selectedNode.metrics.sent++;
- }
- return selectedNode;
- }
-
/**
* Returns the node metrics at a given index.
* If there is no entry at the given index it is created by this call.
*/
- private NodeMetrics getNodeMetrics(Mirror.Entry entry) {
+ protected final synchronized NodeMetrics getNodeMetrics(Mirror.Entry entry) {
int index = getCachedIndex(entry.getName());
// expand node array as needed
while (nodeWeights.size() < (index + 1))
@@ -105,38 +69,15 @@ public class LoadBalancer {
NodeMetrics nodeMetrics = nodeWeights.get(index);
if (nodeMetrics == null) { // initialize statistics for this node
- nodeMetrics = new NodeMetrics();
+ nodeMetrics = createNodeMetrics();
nodeWeights.set(index, nodeMetrics);
}
return nodeMetrics;
}
- /** Scale weights such that ratios are preserved */
- private void increaseWeights() {
- for (NodeMetrics n : nodeWeights) {
- if (n == null) continue;
- double want = n.weight * 1.01010101010101010101;
- if (want >= 1.0) {
- n.weight = want;
- } else {
- n.weight = 1.0;
- }
- }
+ protected NodeMetrics createNodeMetrics() {
+ return new NodeMetrics();
}
-
- public void received(Node node, boolean busy) {
- if (busy) {
- synchronized (this) {
- double wantWeight = node.metrics.weight - 0.01;
- if (wantWeight < 1.0) {
- increaseWeights();
- node.metrics.weight = 1.0;
- } else {
- node.metrics.weight = wantWeight;
- }
- node.metrics.busy++;
- }
- }
- }
-
+ abstract Node getRecipient(List<Mirror.Entry> choices);
+ abstract void received(Node node, boolean busy);
}