diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-12 21:33:22 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-12 21:35:08 +0000 |
commit | cf214d62e2eaba0b413feec0fc525142698a8fda (patch) | |
tree | b38184c06a7d5ece08fed2e09608824a25b5f963 /documentapi | |
parent | 86cfc2945d4711d47e224a6338372d11abe7e9f3 (diff) |
Use syncronized to make the loadbalancer semantically thread safe.
Use a cache to avoid parsing a string to get an index.
Move test to same package to avoid public access to internal details.
Diffstat (limited to 'documentapi')
-rw-r--r-- | documentapi/abi-spec.json | 30 | ||||
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java | 83 | ||||
-rw-r--r-- | documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/LoadBalancerTestCase.java) | 15 |
3 files changed, 54 insertions, 74 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index 80fccbd9161..3092c352ee4 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -2082,35 +2082,6 @@ ], "fields": [] }, - "com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>(com.yahoo.jrt.slobrok.api.Mirror$Entry, com.yahoo.documentapi.messagebus.protocol.LoadBalancer$NodeMetrics)" - ], - "fields": [ - "public com.yahoo.jrt.slobrok.api.Mirror$Entry entry", - "public com.yahoo.documentapi.messagebus.protocol.LoadBalancer$NodeMetrics metrics" - ] - }, - "com.yahoo.documentapi.messagebus.protocol.LoadBalancer$NodeMetrics": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>()" - ], - "fields": [ - "public java.util.concurrent.atomic.AtomicLong sent", - "public java.util.concurrent.atomic.AtomicLong busy", - "public double weight" - ] - }, "com.yahoo.documentapi.messagebus.protocol.LoadBalancer": { "superClass": "java.lang.Object", "interfaces": [], @@ -2120,7 +2091,6 @@ "methods": [ "public void <init>(java.lang.String)", "public java.util.List getNodeWeights()", - "public int getIndex(java.lang.String)", "public com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node getRecipient(java.util.List)", "public void received(com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node, boolean)" ], diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java index 054b120cf81..6d08dc2cee3 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java @@ -1,12 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; -import com.google.common.util.concurrent.AtomicDouble; import com.yahoo.jrt.slobrok.api.Mirror; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Map; /** * Load balances over a set of nodes based on statistics gathered from those nodes. @@ -15,24 +15,25 @@ import java.util.concurrent.atomic.AtomicLong; */ public class LoadBalancer { - public static class NodeMetrics { - public AtomicLong sent = new AtomicLong(); - public AtomicLong busy = new AtomicLong(); - public double weight = 1.0; + static class NodeMetrics { + long sent = 0; + long busy = 0; + double weight = 1.0; } - public static class Node { - public Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; } + static class Node { + Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; } - public Mirror.Entry entry; - public NodeMetrics metrics; + Mirror.Entry entry; + NodeMetrics metrics; } /** Statistics on each node we are load balancing over. Populated lazily. */ - private final List<NodeMetrics> nodeWeights = new CopyOnWriteArrayList<>(); + private final List<NodeMetrics> nodeWeights = new ArrayList<>(); + private final Map<String, Integer> cachedIndex = new HashMap<>(); private final String cluster; - private final AtomicDouble safePosition = new AtomicDouble(0.0); + private double position = 0.0; public LoadBalancer(String cluster) { this.cluster = cluster; @@ -43,7 +44,7 @@ public class LoadBalancer { } /** Returns the index from a node name string */ - public int getIndex(String nodeName) { + int getIndex(String nodeName) { try { String s = nodeName.substring(cluster.length() + 1); s = s.substring(0, s.indexOf("/")); @@ -54,6 +55,14 @@ public class LoadBalancer { throw new IllegalArgumentException(err, e); } } + private int getCachedIndex(String nodeName) { + Integer index = cachedIndex.get(nodeName); + if (index == null) { + index = getIndex(nodeName); + cachedIndex.put(nodeName, index); + } + return index; + } /** * The load balancing operation: Returns a node choice from the given choices, @@ -68,24 +77,24 @@ public class LoadBalancer { double weightSum = 0.0; Node selectedNode = null; - double position = safePosition.get(); - for (Mirror.Entry entry : choices) { - NodeMetrics nodeMetrics = getNodeMetrics(entry); + synchronized (this) { + for (Mirror.Entry entry : choices) { + NodeMetrics nodeMetrics = getNodeMetrics(entry); - weightSum += nodeMetrics.weight; + weightSum += nodeMetrics.weight; - if (weightSum > position) { - selectedNode = new Node(entry, nodeMetrics); - break; + if (weightSum > position) { + selectedNode = new Node(entry, nodeMetrics); + break; + } } + if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason) + position -= weightSum; + selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0))); + } + position += 1.0; + selectedNode.metrics.sent++; } - if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason) - position -= weightSum; - selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0))); - } - position += 1.0; - safePosition.set(position); - selectedNode.metrics.sent.incrementAndGet(); return selectedNode; } @@ -94,7 +103,7 @@ public class LoadBalancer { * If there is no entry at the given index it is created by this call. */ private NodeMetrics getNodeMetrics(Mirror.Entry entry) { - int index = getIndex(entry.getName()); + int index = getCachedIndex(entry.getName()); // expand node array as needed while (nodeWeights.size() < (index + 1)) nodeWeights.add(null); @@ -122,14 +131,16 @@ public class LoadBalancer { public void received(Node node, boolean busy) { if (busy) { - double wantWeight = node.metrics.weight - 0.01; - if (wantWeight < 1.0) { - increaseWeights(); - node.metrics.weight = 1.0; - } else { - node.metrics.weight = wantWeight; + synchronized (this) { + double wantWeight = node.metrics.weight - 0.01; + if (wantWeight < 1.0) { + increaseWeights(); + node.metrics.weight = 1.0; + } else { + node.metrics.weight = wantWeight; + } + node.metrics.busy++; } - node.metrics.busy.incrementAndGet(); } } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/LoadBalancerTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java index 51dd1ac12b8..dc96fcaf45d 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/LoadBalancerTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol.test; +package com.yahoo.documentapi.messagebus.protocol; -import com.yahoo.documentapi.messagebus.protocol.LoadBalancer; import com.yahoo.jrt.slobrok.api.Mirror; import org.junit.Test; @@ -54,13 +53,13 @@ public class LoadBalancerTestCase { assertEquals("foo/" + (i % 3) + "/default" , node.entry.getName()); } - assertEquals(33, weights.get(0).sent.intValue()); - assertEquals(33, weights.get(1).sent.intValue()); - assertEquals(33, weights.get(2).sent.intValue()); + assertEquals(33, weights.get(0).sent); + assertEquals(33, weights.get(1).sent); + assertEquals(33, weights.get(2).sent); - weights.get(0).sent.set(0); - weights.get(1).sent.set(0); - weights.get(2).sent.set(0); + weights.get(0).sent = 0; + weights.get(1).sent = 0; + weights.get(2).sent = 0; } { |