diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-02 10:51:37 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-02 10:51:37 +0000 |
commit | 17a7205ae5a33555f5a91837300e07fcbb7bb4b3 (patch) | |
tree | f6cbd1337f517c9297ef099d8856122f33042772 /documentapi | |
parent | 8a3850679c830c2b56e1b0d6f8c387dfa9b2aa62 (diff) |
Make the current loadbalancer into 'legacy'.
Add a new 'adaptive' loadbalancer that uses select-best-of-2-random-picks.
Diffstat (limited to 'documentapi')
6 files changed, 262 insertions, 117 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index 71ef49fcd1b..8f49b51fa1c 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -2082,20 +2082,6 @@ ], "fields": [] }, - "com.yahoo.documentapi.messagebus.protocol.LoadBalancer": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>(java.lang.String)", - "public java.util.List getNodeWeights()", - "public com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node getRecipient(java.util.List)", - "public void received(com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node, boolean)" - ], - "fields": [] - }, "com.yahoo.documentapi.messagebus.protocol.LoadBalancerPolicy": { "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy", "interfaces": [], diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AdaptiveLoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AdaptiveLoadBalancer.java new file mode 100644 index 00000000000..4688762bebf --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AdaptiveLoadBalancer.java @@ -0,0 +1,55 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.slobrok.api.Mirror; + +import java.util.List; +import java.util.Random; + +/** + * Will pick 2 random candidates and select the one with least pending operations. + * + * @author baldersheim + */ +class AdaptiveLoadBalancer extends LoadBalancer { + private Random random = new Random(); + AdaptiveLoadBalancer(String cluster) { + super(cluster); + } + + @Override + Node getRecipient(List<Mirror.Entry> choices) { + if (choices.isEmpty()) return null; + Mirror.Entry entry; + NodeMetrics metrics; + if (choices.size() == 1) { + entry = choices.get(0); + metrics = getNodeMetrics(entry); + } else { + int candidateA = 0; + int candidateB = 1; + if (choices.size() > 2) { + candidateA = random.nextInt(choices.size()); + candidateB = random.nextInt(choices.size()); + } + entry = choices.get(candidateA); + Mirror.Entry entryB = choices.get(candidateB); + metrics = getNodeMetrics(entry); + NodeMetrics metricsB = getNodeMetrics(entryB); + if (metrics.pending() > metricsB.pending()) { + entry = entryB; + metrics = metricsB; + } + } + metrics.incSend(); + return new Node(entry, metrics); + } + + @Override + void received(Node node, boolean busy) { + node.metrics.incReceived(); + if (busy) { + node.metrics.incBusy(); + } + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LegacyLoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LegacyLoadBalancer.java new file mode 100644 index 00000000000..54e18302ef6 --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LegacyLoadBalancer.java @@ -0,0 +1,93 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.slobrok.api.Mirror; + +import java.util.List; + +/** + * Load balances over a set of nodes based on statistics gathered from those nodes. + * + * @author thomasg + */ +class LegacyLoadBalancer extends LoadBalancer { + + static class LegacyNodeMetrics extends NodeMetrics { + double weight = 1.0; + } + + private double position = 0.0; + + public LegacyLoadBalancer(String cluster) { + super(cluster); + } + + /** + * The load balancing operation: Returns a node choice from the given choices, + * based on previously gathered statistics on the nodes, and a running "position" + * which is increased by 1 on each call to this. + * + * @param choices the node choices, represented as Slobrok entries + * @return the chosen node, or null only if the given choices were zero + */ + public Node getRecipient(List<Mirror.Entry> choices) { + if (choices.isEmpty()) return null; + + double weightSum = 0.0; + Node selectedNode = null; + synchronized (this) { + for (Mirror.Entry entry : choices) { + LegacyNodeMetrics nodeMetrics = (LegacyNodeMetrics)getNodeMetrics(entry); + + weightSum += nodeMetrics.weight; + + if (weightSum > position) { + selectedNode = new Node(entry, nodeMetrics); + break; + } + } + if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason) + position -= weightSum; + selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0))); + } + position += 1.0; + selectedNode.metrics.incSend(); + } + return selectedNode; + } + + protected NodeMetrics createNodeMetrics() { + return new LegacyNodeMetrics(); + } + + /** Scale weights such that ratios are preserved */ + private void increaseWeights() { + for (NodeMetrics nodeMetrics : getNodeWeights()) { + LegacyNodeMetrics n = (LegacyNodeMetrics) nodeMetrics; + if (n == null) continue; + double want = n.weight * 1.01010101010101010101; + if (want >= 1.0) { + n.weight = want; + } else { + n.weight = 1.0; + } + } + } + + public void received(Node node, boolean busy) { + if (busy) { + synchronized (this) { + LegacyNodeMetrics n = (LegacyNodeMetrics) node.metrics; + double wantWeight = n.weight - 0.01; + if (wantWeight < 1.0) { + increaseWeights(); + n.weight = 1.0; + } else { + n.weight = wantWeight; + } + node.metrics.incBusy(); + } + } + } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java index a9b0632a767..c464bed5b87 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java @@ -1,4 +1,3 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; import com.yahoo.jrt.slobrok.api.Mirror; @@ -7,20 +6,24 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; -/** - * Load balances over a set of nodes based on statistics gathered from those nodes. - * - * @author thomasg - */ -public class LoadBalancer { - +abstract class LoadBalancer { static class NodeMetrics { - long sent = 0; - long busy = 0; - double weight = 1.0; + private AtomicLong sent = new AtomicLong(0); + private AtomicLong received = new AtomicLong(0); + private AtomicLong busy = new AtomicLong(0); + long pending() { return sent.get() - received.get(); } + void incSend() { sent.incrementAndGet(); } + void incReceived() { received.incrementAndGet(); } + void incBusy() { busy.incrementAndGet(); } + long sent() { return sent.get(); } + void reset() { + sent.set(0); + received.set(0); + busy.set(0); + } } - static class Node { Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; } @@ -28,21 +31,17 @@ public class LoadBalancer { NodeMetrics metrics; } + private final Map<String, Integer> cachedIndex = new HashMap<>(); /** Statistics on each node we are load balancing over. Populated lazily. */ private final List<NodeMetrics> nodeWeights = new ArrayList<>(); - private final Map<String, Integer> cachedIndex = new HashMap<>(); - private final String cluster; - private double position = 0.0; public LoadBalancer(String cluster) { this.cluster = cluster; } - - public List<NodeMetrics> getNodeWeights() { + List<NodeMetrics> getNodeWeights() { return nodeWeights; } - /** Returns the index from a node name string */ int getIndex(String nodeName) { try { @@ -55,49 +54,14 @@ public class LoadBalancer { throw new IllegalArgumentException(err, e); } } - private int getCachedIndex(String nodeName) { + int getCachedIndex(String nodeName) { return cachedIndex.computeIfAbsent(nodeName, key -> getIndex(key)); } - - /** - * The load balancing operation: Returns a node choice from the given choices, - * based on previously gathered statistics on the nodes, and a running "position" - * which is increased by 1 on each call to this. - * - * @param choices the node choices, represented as Slobrok entries - * @return the chosen node, or null only if the given choices were zero - */ - public Node getRecipient(List<Mirror.Entry> choices) { - if (choices.isEmpty()) return null; - - double weightSum = 0.0; - Node selectedNode = null; - synchronized (this) { - for (Mirror.Entry entry : choices) { - NodeMetrics nodeMetrics = getNodeMetrics(entry); - - weightSum += nodeMetrics.weight; - - if (weightSum > position) { - selectedNode = new Node(entry, nodeMetrics); - break; - } - } - if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason) - position -= weightSum; - selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0))); - } - position += 1.0; - selectedNode.metrics.sent++; - } - return selectedNode; - } - /** * Returns the node metrics at a given index. * If there is no entry at the given index it is created by this call. */ - private NodeMetrics getNodeMetrics(Mirror.Entry entry) { + protected final synchronized NodeMetrics getNodeMetrics(Mirror.Entry entry) { int index = getCachedIndex(entry.getName()); // expand node array as needed while (nodeWeights.size() < (index + 1)) @@ -105,38 +69,15 @@ public class LoadBalancer { NodeMetrics nodeMetrics = nodeWeights.get(index); if (nodeMetrics == null) { // initialize statistics for this node - nodeMetrics = new NodeMetrics(); + nodeMetrics = createNodeMetrics(); nodeWeights.set(index, nodeMetrics); } return nodeMetrics; } - /** Scale weights such that ratios are preserved */ - private void increaseWeights() { - for (NodeMetrics n : nodeWeights) { - if (n == null) continue; - double want = n.weight * 1.01010101010101010101; - if (want >= 1.0) { - n.weight = want; - } else { - n.weight = 1.0; - } - } + protected NodeMetrics createNodeMetrics() { + return new NodeMetrics(); } - - public void received(Node node, boolean busy) { - if (busy) { - synchronized (this) { - double wantWeight = node.metrics.weight - 0.01; - if (wantWeight < 1.0) { - increaseWeights(); - node.metrics.weight = 1.0; - } else { - node.metrics.weight = wantWeight; - } - node.metrics.busy++; - } - } - } - + abstract Node getRecipient(List<Mirror.Entry> choices); + abstract void received(Node node, boolean busy); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java index ae4606adaaf..d666a5d811d 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java @@ -48,12 +48,19 @@ public class LoadBalancerPolicy extends SlobrokPolicy { } pattern = cluster + "/*/" + session; - loadBalancer = new LoadBalancer(cluster); + String type = params.get("type"); + if (type == "adaptive") { + loadBalancer = new AdaptiveLoadBalancer(cluster); + } else if (type == "legacy") { + loadBalancer = new LegacyLoadBalancer(cluster); + } else { + loadBalancer = new LegacyLoadBalancer(cluster); + } } @Override public void select(RoutingContext context) { - LoadBalancer.Node node = getRecipient(context); + LegacyLoadBalancer.Node node = getRecipient(context); if (node != null) { context.setContext(node); @@ -70,7 +77,7 @@ public class LoadBalancerPolicy extends SlobrokPolicy { @return Returns a hop representing the TCP address of the target, or null if none could be found. */ - private LoadBalancer.Node getRecipient(RoutingContext context) { + private LegacyLoadBalancer.Node getRecipient(RoutingContext context) { List<Mirror.Entry> lastLookup = lookup(context, pattern); return loadBalancer.getRecipient(lastLookup); } @@ -78,7 +85,7 @@ public class LoadBalancerPolicy extends SlobrokPolicy { public void merge(RoutingContext context) { RoutingNodeIterator it = context.getChildIterator(); Reply reply = it.removeReply(); - LoadBalancer.Node target = (LoadBalancer.Node)context.getContext(); + LegacyLoadBalancer.Node target = (LegacyLoadBalancer.Node)context.getContext(); boolean busy = false; for (int i = 0; i < reply.getNumErrors(); i++) { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java index dc96fcaf45d..225345afbec 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java @@ -8,6 +8,8 @@ import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -30,7 +32,7 @@ public class LoadBalancerTestCase { } private static void assertIllegalArgument(String clusterName, String recipient, String expectedMessage) { - LoadBalancer policy = new LoadBalancer(clusterName); + LegacyLoadBalancer policy = new LegacyLoadBalancer(clusterName); try { fail("Expected exception, got index " + policy.getIndex(recipient) + "."); } catch (IllegalArgumentException e) { @@ -39,8 +41,67 @@ public class LoadBalancerTestCase { } @Test - public void testLoadBalancer() { - LoadBalancer lb = new LoadBalancer("foo"); + public void testAdaptiveLoadBalancer() { + LoadBalancer lb = new AdaptiveLoadBalancer("foo"); + + List<Mirror.Entry> entries = Arrays.asList(new Mirror.Entry("foo/0/default", "tcp/bar:1"), + new Mirror.Entry("foo/1/default", "tcp/bar:2"), + new Mirror.Entry("foo/2/default", "tcp/bar:3")); + List<LoadBalancer.NodeMetrics> weights = lb.getNodeWeights(); + + for (int i = 0; i < 9999; i++) { + LoadBalancer.Node node = lb.getRecipient(entries); + assertNotNull(node); + } + + long sentSum = 0; + for (var metrics : weights) { + assertTrue(10 > Math.abs(metrics.sent() - 3333)); + sentSum += metrics.sent(); + } + assertEquals(9999, sentSum); + + for (var metrics : weights) { + metrics.reset(); + } + + // Simulate 1/1, 1/2, 1/4 processing capacity + for (int i = 0; i < 9999; i++) { + LoadBalancer.Node node = lb.getRecipient(entries); + assertNotNull(node); + if (node.entry.getName().contains("1")) { + lb.received(node, false); + } else if (node.entry.getName().contains("2")) { + if ((i % 2) == 0) { + lb.received(node, false); + } + } else { + if ((i % 4) == 0) { + lb.received(node, false); + } + } + } + + sentSum = 0; + long sumPending = 0; + for (var metrics : weights) { + System.out.println("m: s=" + metrics.sent() + " p=" + metrics.pending()); + sentSum += metrics.sent(); + sumPending += metrics.pending(); + } + assertEquals(9999, sentSum); + assertTrue(200 > Math.abs(sumPending - 2700)); + assertTrue( 100 > Math.abs(weights.get(0).sent() - 1780)); + assertTrue( 200 > Math.abs(weights.get(1).sent() - 5500)); + assertTrue( 100 > Math.abs(weights.get(2).sent() - 2650)); + assertTrue( 100 > Math.abs(weights.get(0).pending() - 1340)); + assertEquals( 0, weights.get(1).pending()); + assertTrue( 100 > Math.abs(weights.get(2).pending() - 1340)); + } + + @Test + public void testLegacyLoadBalancer() { + LoadBalancer lb = new LegacyLoadBalancer("foo"); List<Mirror.Entry> entries = Arrays.asList(new Mirror.Entry("foo/0/default", "tcp/bar:1"), new Mirror.Entry("foo/1/default", "tcp/bar:2"), @@ -53,13 +114,13 @@ public class LoadBalancerTestCase { assertEquals("foo/" + (i % 3) + "/default" , node.entry.getName()); } - assertEquals(33, weights.get(0).sent); - assertEquals(33, weights.get(1).sent); - assertEquals(33, weights.get(2).sent); + assertEquals(33, weights.get(0).sent()); + assertEquals(33, weights.get(1).sent()); + assertEquals(33, weights.get(2).sent()); - weights.get(0).sent = 0; - weights.get(1).sent = 0; - weights.get(2).sent = 0; + weights.get(0).reset(); + weights.get(1).reset(); + weights.get(2).reset(); } { @@ -78,9 +139,9 @@ public class LoadBalancerTestCase { lb.received(new LoadBalancer.Node(new Mirror.Entry("foo/1/default", "tcp/bar:2"), weights.get(1)), false); } - assertEquals(421, (int)(100 * weights.get(0).weight / weights.get(1).weight)); - assertEquals(100, (int)(100 * weights.get(1).weight)); - assertEquals(421, (int)(100 * weights.get(2).weight / weights.get(1).weight)); + assertEquals(421, (int)(100 * ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(0)).weight / ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(1)).weight)); + assertEquals(100, (int)(100 * ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(1)).weight)); + assertEquals(421, (int)(100 * ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(2)).weight / ((LegacyLoadBalancer.LegacyNodeMetrics)weights.get(1)).weight)); } @@ -96,9 +157,7 @@ public class LoadBalancerTestCase { assertEquals("foo/0/default" , lb.getRecipient(entries).entry.getName()); } - @Test - public void testLoadBalancerOneItemOnly() { - LoadBalancer lb = new LoadBalancer("foo"); + private void verifyLoadBalancerOneItemOnly(LoadBalancer lb) { List<Mirror.Entry> entries = Arrays.asList(new Mirror.Entry("foo/0/default", "tcp/bar:1") ); List<LoadBalancer.NodeMetrics> weights = lb.getNodeWeights(); @@ -108,6 +167,10 @@ public class LoadBalancerTestCase { lb.received(new LoadBalancer.Node(new Mirror.Entry("foo/0/default", "tcp/bar:1"), weights.get(0)), true); // busy assertEquals("foo/0/default" , lb.getRecipient(entries).entry.getName()); - + } + @Test + public void testLoadBalancerOneItemOnly() { + verifyLoadBalancerOneItemOnly(new LegacyLoadBalancer("foo")); + verifyLoadBalancerOneItemOnly(new AdaptiveLoadBalancer("foo")); } } |