aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-06-02 10:51:37 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-06-02 10:51:37 +0000
commit17a7205ae5a33555f5a91837300e07fcbb7bb4b3 (patch)
treef6cbd1337f517c9297ef099d8856122f33042772
parent8a3850679c830c2b56e1b0d6f8c387dfa9b2aa62 (diff)
Make the current loadbalancer into 'legacy'.
Add a new 'adaptive' loadbalancer that uses select-best-of-2-random-picks.
-rw-r--r--documentapi/abi-spec.json14
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AdaptiveLoadBalancer.java55
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LegacyLoadBalancer.java93
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java107
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java15
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java95
6 files changed, 262 insertions, 117 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 71ef49fcd1b..8f49b51fa1c 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -2082,20 +2082,6 @@
],
"fields": []
},
- "com.yahoo.documentapi.messagebus.protocol.LoadBalancer": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>(java.lang.String)",
- "public java.util.List getNodeWeights()",
- "public com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node getRecipient(java.util.List)",
- "public void received(com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node, boolean)"
- ],
- "fields": []
- },
"com.yahoo.documentapi.messagebus.protocol.LoadBalancerPolicy": {
"superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy",
"interfaces": [],
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AdaptiveLoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AdaptiveLoadBalancer.java
new file mode 100644
index 00000000000..4688762bebf
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AdaptiveLoadBalancer.java
@@ -0,0 +1,55 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.jrt.slobrok.api.Mirror;
+
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Will pick 2 random candidates and select the one with least pending operations.
+ *
+ * @author baldersheim
+ */
+class AdaptiveLoadBalancer extends LoadBalancer {
+ private Random random = new Random();
+ AdaptiveLoadBalancer(String cluster) {
+ super(cluster);
+ }
+
+ @Override
+ Node getRecipient(List<Mirror.Entry> choices) {
+ if (choices.isEmpty()) return null;
+ Mirror.Entry entry;
+ NodeMetrics metrics;
+ if (choices.size() == 1) {
+ entry = choices.get(0);
+ metrics = getNodeMetrics(entry);
+ } else {
+ int candidateA = 0;
+ int candidateB = 1;
+ if (choices.size() > 2) {
+ candidateA = random.nextInt(choices.size());
+ candidateB = random.nextInt(choices.size());
+ }
+ entry = choices.get(candidateA);
+ Mirror.Entry entryB = choices.get(candidateB);
+ metrics = getNodeMetrics(entry);
+ NodeMetrics metricsB = getNodeMetrics(entryB);
+ if (metrics.pending() > metricsB.pending()) {
+ entry = entryB;
+ metrics = metricsB;
+ }
+ }
+ metrics.incSend();
+ return new Node(entry, metrics);
+ }
+
+ @Override
+ void received(Node node, boolean busy) {
+ node.metrics.incReceived();
+ if (busy) {
+ node.metrics.incBusy();
+ }
+ }
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LegacyLoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LegacyLoadBalancer.java
new file mode 100644
index 00000000000..54e18302ef6
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LegacyLoadBalancer.java
@@ -0,0 +1,93 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.jrt.slobrok.api.Mirror;
+
+import java.util.List;
+
+/**
+ * Load balances over a set of nodes based on statistics gathered from those nodes.
+ *
+ * @author thomasg
+ */
+class LegacyLoadBalancer extends LoadBalancer {
+
+ static class LegacyNodeMetrics extends NodeMetrics {
+ double weight = 1.0;
+ }
+
+ private double position = 0.0;
+
+ public LegacyLoadBalancer(String cluster) {
+ super(cluster);
+ }
+
+ /**
+ * The load balancing operation: Returns a node choice from the given choices,
+ * based on previously gathered statistics on the nodes, and a running "position"
+ * which is increased by 1 on each call to this.
+ *
+ * @param choices the node choices, represented as Slobrok entries
+ * @return the chosen node, or null only if the given choices were zero
+ */
+ public Node getRecipient(List<Mirror.Entry> choices) {
+ if (choices.isEmpty()) return null;
+
+ double weightSum = 0.0;
+ Node selectedNode = null;
+ synchronized (this) {
+ for (Mirror.Entry entry : choices) {
+ LegacyNodeMetrics nodeMetrics = (LegacyNodeMetrics)getNodeMetrics(entry);
+
+ weightSum += nodeMetrics.weight;
+
+ if (weightSum > position) {
+ selectedNode = new Node(entry, nodeMetrics);
+ break;
+ }
+ }
+ if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason)
+ position -= weightSum;
+ selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0)));
+ }
+ position += 1.0;
+ selectedNode.metrics.incSend();
+ }
+ return selectedNode;
+ }
+
+ protected NodeMetrics createNodeMetrics() {
+ return new LegacyNodeMetrics();
+ }
+
+ /** Scale weights such that ratios are preserved */
+ private void increaseWeights() {
+ for (NodeMetrics nodeMetrics : getNodeWeights()) {
+ LegacyNodeMetrics n = (LegacyNodeMetrics) nodeMetrics;
+ if (n == null) continue;
+ double want = n.weight * 1.01010101010101010101;
+ if (want >= 1.0) {
+ n.weight = want;
+ } else {
+ n.weight = 1.0;
+ }
+ }
+ }
+
+ public void received(Node node, boolean busy) {
+ if (busy) {
+ synchronized (this) {
+ LegacyNodeMetrics n = (LegacyNodeMetrics) node.metrics;
+ double wantWeight = n.weight - 0.01;
+ if (wantWeight < 1.0) {
+ increaseWeights();
+ n.weight = 1.0;
+ } else {
+ n.weight = wantWeight;
+ }
+ node.metrics.incBusy();
+ }
+ }
+ }
+
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
index a9b0632a767..c464bed5b87 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
@@ -1,4 +1,3 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.jrt.slobrok.api.Mirror;
@@ -7,20 +6,24 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
-/**
- * Load balances over a set of nodes based on statistics gathered from those nodes.
- *
- * @author thomasg
- */
-public class LoadBalancer {
-
+abstract class LoadBalancer {
static class NodeMetrics {
- long sent = 0;
- long busy = 0;
- double weight = 1.0;
+ private AtomicLong sent = new AtomicLong(0);
+ private AtomicLong received = new AtomicLong(0);
+ private AtomicLong busy = new AtomicLong(0);
+ long pending() { return sent.get() - received.get(); }
+ void incSend() { sent.incrementAndGet(); }
+ void incReceived() { received.incrementAndGet(); }
+ void incBusy() { busy.incrementAndGet(); }
+ long sent() { return sent.get(); }
+ void reset() {
+ sent.set(0);
+ received.set(0);
+ busy.set(0);
+ }
}
-
static class Node {
Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; }
@@ -28,21 +31,17 @@ public class LoadBalancer {
NodeMetrics metrics;
}
+ private final Map<String, Integer> cachedIndex = new HashMap<>();
/** Statistics on each node we are load balancing over. Populated lazily. */
private final List<NodeMetrics> nodeWeights = new ArrayList<>();
- private final Map<String, Integer> cachedIndex = new HashMap<>();
-
private final String cluster;
- private double position = 0.0;
public LoadBalancer(String cluster) {
this.cluster = cluster;
}
-
- public List<NodeMetrics> getNodeWeights() {
+ List<NodeMetrics> getNodeWeights() {
return nodeWeights;
}
-
/** Returns the index from a node name string */
int getIndex(String nodeName) {
try {
@@ -55,49 +54,14 @@ public class LoadBalancer {
throw new IllegalArgumentException(err, e);
}
}
- private int getCachedIndex(String nodeName) {
+ int getCachedIndex(String nodeName) {
return cachedIndex.computeIfAbsent(nodeName, key -> getIndex(key));
}
-
- /**
- * The load balancing operation: Returns a node choice from the given choices,
- * based on previously gathered statistics on the nodes, and a running "position"
- * which is increased by 1 on each call to this.
- *
- * @param choices the node choices, represented as Slobrok entries
- * @return the chosen node, or null only if the given choices were zero
- */
- public Node getRecipient(List<Mirror.Entry> choices) {
- if (choices.isEmpty()) return null;
-
- double weightSum = 0.0;
- Node selectedNode = null;
- synchronized (this) {
- for (Mirror.Entry entry : choices) {
- NodeMetrics nodeMetrics = getNodeMetrics(entry);
-
- weightSum += nodeMetrics.weight;
-
- if (weightSum > position) {
- selectedNode = new Node(entry, nodeMetrics);
- break;
- }
- }
- if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason)
- position -= weightSum;
- selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0)));
- }
- position += 1.0;
- selectedNode.metrics.sent++;
- }
- return selectedNode;
- }
-
/**
* Returns the node metrics at a given index.
* If there is no entry at the given index it is created by this call.
*/
- private NodeMetrics getNodeMetrics(Mirror.Entry entry) {
+ protected final synchronized NodeMetrics getNodeMetrics(Mirror.Entry entry) {
int index = getCachedIndex(entry.getName());
// expand node array as needed
while (nodeWeights.size() < (index + 1))
@@ -105,38 +69,15 @@ public class LoadBalancer {
NodeMetrics nodeMetrics = nodeWeights.get(index);
if (nodeMetrics == null) { // initialize statistics for this node
- nodeMetrics = new NodeMetrics();
+ nodeMetrics = createNodeMetrics();
nodeWeights.set(index, nodeMetrics);
}
return nodeMetrics;
}
- /** Scale weights such that ratios are preserved */
- private void increaseWeights() {
- for (NodeMetrics n : nodeWeights) {
- if (n == null) continue;
- double want = n.weight * 1.01010101010101010101;
- if (want >= 1.0) {
- n.weight = want;
- } else {
- n.weight = 1.0;
- }
- }
+ protected NodeMetrics createNodeMetrics() {
+ return new NodeMetrics();
}
-
- public void received(Node node, boolean busy) {
- if (busy) {
- synchronized (this) {
- double wantWeight = node.metrics.weight - 0.01;
- if (wantWeight < 1.0) {
- increaseWeights();
- node.metrics.weight = 1.0;
- } else {
- node.metrics.weight = wantWeight;
- }
- node.metrics.busy++;
- }
- }
- }
-
+ abstract Node getRecipient(List<Mirror.Entry> choices);
+ abstract void received(Node node, boolean busy);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java
index ae4606adaaf..d666a5d811d 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java
@@ -48,12 +48,19 @@ public class LoadBalancerPolicy extends SlobrokPolicy {
}
pattern = cluster + "/*/" + session;
- loadBalancer = new LoadBalancer(cluster);
+ String type = params.get("type");
+ if (type == "adaptive") {
+ loadBalancer = new AdaptiveLoadBalancer(cluster);
+ } else if (type == "legacy") {
+ loadBalancer = new LegacyLoadBalancer(cluster);
+ } else {
+ loadBalancer = new LegacyLoadBalancer(cluster);
+ }
}
@Override
public void select(RoutingContext context) {
- LoadBalancer.Node node = getRecipient(context);
+ LegacyLoadBalancer.Node node = getRecipient(context);
if (node != null) {
context.setContext(node);
@@ -70,7 +77,7 @@ public class LoadBalancerPolicy extends SlobrokPolicy {
@return Returns a hop representing the TCP address of the target, or null if none could be found.
*/
- private LoadBalancer.Node getRecipient(RoutingContext context) {
+ private LegacyLoadBalancer.Node getRecipient(RoutingContext context) {
List<Mirror.Entry> lastLookup = lookup(context, pattern);
return loadBalancer.getRecipient(lastLookup);
}
@@ -78,7 +85,7 @@ public class LoadBalancerPolicy extends SlobrokPolicy {
public void merge(RoutingContext context) {
RoutingNodeIterator it = context.getChildIterator();
Reply reply = it.removeReply();
- LoadBalancer.Node target = (LoadBalancer.Node)context.getContext();
+ LegacyLoadBalancer.Node target = (LegacyLoadBalancer.Node)context.getContext();
boolean busy = false;
for (int i = 0; i < reply.getNumErrors(); i++) {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java
index dc96fcaf45d..225345afbec 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java
@@ -8,6 +8,8 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -30,7 +32,7 @@ public class LoadBalancerTestCase {
}
private static void assertIllegalArgument(String clusterName, String recipient, String expectedMessage) {
- LoadBalancer policy = new LoadBalancer(clusterName);
+ LegacyLoadBalancer policy = new LegacyLoadBalancer(clusterName);
try {
fail("Expected exception, got index " + policy.getIndex(recipient) + ".");
} catch (IllegalArgumentException e) {
@@ -39,8 +41,67 @@ public class LoadBalancerTestCase {
}
@Test
- public void testLoadBalancer() {
- LoadBalancer lb = new LoadBalancer("foo");
+ public void testAdaptiveLoadBalancer() {
+ LoadBalancer lb = new AdaptiveLoadBalancer("foo");
+
+ List<Mirror.Entry> entries = Arrays.asList(new Mirror.Entry("foo/0/default", "tcp/bar:1"),
+ new Mirror.Entry("foo/1/default", "tcp/bar:2"),
+ new Mirror.Entry("foo/2/default", "tcp/bar:3"));
+ List<LoadBalancer.NodeMetrics> weights = lb.getNodeWeights();
+
+ for (int i = 0; i < 9999; i++) {
+ LoadBalancer.Node node = lb.getRecipient(entries);
+ assertNotNull(node);
+ }
+
+ long sentSum = 0;
+ for (var metrics : weights) {
+ assertTrue(10 > Math.abs(metrics.sent() - 3333));
+ sentSum += metrics.sent();
+ }
+ assertEquals(9999, sentSum);
+
+ for (var metrics : weights) {
+ metrics.reset();
+ }
+
+ // Simulate 1/1, 1/2, 1/4 processing capacity
+ for (int i = 0; i < 9999; i++) {
+ LoadBalancer.Node node = lb.getRecipient(entries);
+ assertNotNull(node);
+ if (node.entry.getName().contains("1")) {
+ lb.received(node, false);
+ } else if (node.entry.getName().contains("2")) {
+ if ((i % 2) == 0) {
+ lb.received(node, false);
+ }
+ } else {
+ if ((i % 4) == 0) {
+ lb.received(node, false);
+ }
+ }
+ }
+
+ sentSum = 0;
+ long sumPending = 0;
+ for (var metrics : weights) {
+ System.out.println("m: s=" + metrics.sent() + " p=" + metrics.pending());
+ sentSum += metrics.sent();
+ sumPending += metrics.pending();
+ }
+ assertEquals(9999, sentSum);
+ assertTrue(200 > Math.abs(sumPending - 2700));
+ assertTrue( 100 > Math.abs(weights.get(0).sent() - 1780));
+ assertTrue( 200 > Math.abs(weights.get(1).sent() - 5500));
+ assertTrue( 100 > Math.abs(weights.get(2).sent() - 2650));
+ assertTrue( 100 > Math.abs(weights.get(0).pending() - 1340));
+ assertEquals( 0, weights.get(1).pending());
+ assertTrue( 100 > Math.abs(weights.get(2).pending() - 1340));
+ }
+
+ @Test
+ public void testLegacyLoadBalancer() {
+ LoadBalancer lb = new LegacyLoadBalancer("foo");
List<Mirror.Entry> entries = Arrays.asList(new Mirror.Entry("foo/0/default", "tcp/bar:1"),
new Mirror.Entry("foo/1/default", "tcp/bar:2"),
@@ -53,13 +114,13 @@ public class LoadBalancerTestCase {
assertEquals("foo/" + (i % 3) + "/default" , node.entry.getName());
}
- assertEquals(33, weights.get(0).sent);
- assertEquals(33, weights.get(1).sent);
- assertEquals(33, weights.get(2).sent);
+ assertEquals(33, weights.get(0).sent());
+ assertEquals(33, weights.get(1).sent());
+ assertEquals(33, weights.get(2).sent());
- weights.get(0).sent = 0;
- weights.get(1).sent = 0;
- weights.get(2).sent = 0;
+ weights.get(0).reset();
+ weights.get(1).reset();
+ weights.get(2).reset();
}
{
@@ -78,9 +139,9 @@ public class LoadBalancerTestCase {
lb.received(new LoadBalancer.Node(new Mirror.Entry("foo/1/default", "tcp/bar:2"), weights.get(1)), false);
}
- assertEquals(421, (int)(100 * weights.get(0).weight / weights.get(1).weight));
- assertEquals(100, (int)(100 * weights.get(1).weight));
- assertEquals(421, (int)(100 * weights.get(2).weight / weights.get(1).weight));
+ assertEquals(421, (int)(100 * ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(0)).weight / ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(1)).weight));
+ assertEquals(100, (int)(100 * ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(1)).weight));
+ assertEquals(421, (int)(100 * ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(2)).weight / ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(1)).weight));
}
@@ -96,9 +157,7 @@ public class LoadBalancerTestCase {
assertEquals("foo/0/default" , lb.getRecipient(entries).entry.getName());
}
- @Test
- public void testLoadBalancerOneItemOnly() {
- LoadBalancer lb = new LoadBalancer("foo");
+ private void verifyLoadBalancerOneItemOnly(LoadBalancer lb) {
List<Mirror.Entry> entries = Arrays.asList(new Mirror.Entry("foo/0/default", "tcp/bar:1") );
List<LoadBalancer.NodeMetrics> weights = lb.getNodeWeights();
@@ -108,6 +167,10 @@ public class LoadBalancerTestCase {
lb.received(new LoadBalancer.Node(new Mirror.Entry("foo/0/default", "tcp/bar:1"), weights.get(0)), true); // busy
assertEquals("foo/0/default" , lb.getRecipient(entries).entry.getName());
-
+ }
+ @Test
+ public void testLoadBalancerOneItemOnly() {
+ verifyLoadBalancerOneItemOnly(new LegacyLoadBalancer("foo"));
+ verifyLoadBalancerOneItemOnly(new AdaptiveLoadBalancer("foo"));
}
}