summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-05-12 21:33:22 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-05-12 21:35:08 +0000
commitcf214d62e2eaba0b413feec0fc525142698a8fda (patch)
treeb38184c06a7d5ece08fed2e09608824a25b5f963 /documentapi
parent86cfc2945d4711d47e224a6338372d11abe7e9f3 (diff)
Use syncronized to make the loadbalancer semantically thread safe.
Use a cache to avoid parsing a string to get an index. Move test to same package to avoid public access to internal details.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/abi-spec.json30
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java83
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java (renamed from documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/LoadBalancerTestCase.java)15
3 files changed, 54 insertions, 74 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 80fccbd9161..3092c352ee4 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -2082,35 +2082,6 @@
],
"fields": []
},
- "com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>(com.yahoo.jrt.slobrok.api.Mirror$Entry, com.yahoo.documentapi.messagebus.protocol.LoadBalancer$NodeMetrics)"
- ],
- "fields": [
- "public com.yahoo.jrt.slobrok.api.Mirror$Entry entry",
- "public com.yahoo.documentapi.messagebus.protocol.LoadBalancer$NodeMetrics metrics"
- ]
- },
- "com.yahoo.documentapi.messagebus.protocol.LoadBalancer$NodeMetrics": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void <init>()"
- ],
- "fields": [
- "public java.util.concurrent.atomic.AtomicLong sent",
- "public java.util.concurrent.atomic.AtomicLong busy",
- "public double weight"
- ]
- },
"com.yahoo.documentapi.messagebus.protocol.LoadBalancer": {
"superClass": "java.lang.Object",
"interfaces": [],
@@ -2120,7 +2091,6 @@
"methods": [
"public void <init>(java.lang.String)",
"public java.util.List getNodeWeights()",
- "public int getIndex(java.lang.String)",
"public com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node getRecipient(java.util.List)",
"public void received(com.yahoo.documentapi.messagebus.protocol.LoadBalancer$Node, boolean)"
],
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
index 054b120cf81..6d08dc2cee3 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
@@ -1,12 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
-import com.google.common.util.concurrent.AtomicDouble;
import com.yahoo.jrt.slobrok.api.Mirror;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Map;
/**
* Load balances over a set of nodes based on statistics gathered from those nodes.
@@ -15,24 +15,25 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class LoadBalancer {
- public static class NodeMetrics {
- public AtomicLong sent = new AtomicLong();
- public AtomicLong busy = new AtomicLong();
- public double weight = 1.0;
+ static class NodeMetrics {
+ long sent = 0;
+ long busy = 0;
+ double weight = 1.0;
}
- public static class Node {
- public Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; }
+ static class Node {
+ Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; }
- public Mirror.Entry entry;
- public NodeMetrics metrics;
+ Mirror.Entry entry;
+ NodeMetrics metrics;
}
/** Statistics on each node we are load balancing over. Populated lazily. */
- private final List<NodeMetrics> nodeWeights = new CopyOnWriteArrayList<>();
+ private final List<NodeMetrics> nodeWeights = new ArrayList<>();
+ private final Map<String, Integer> cachedIndex = new HashMap<>();
private final String cluster;
- private final AtomicDouble safePosition = new AtomicDouble(0.0);
+ private double position = 0.0;
public LoadBalancer(String cluster) {
this.cluster = cluster;
@@ -43,7 +44,7 @@ public class LoadBalancer {
}
/** Returns the index from a node name string */
- public int getIndex(String nodeName) {
+ int getIndex(String nodeName) {
try {
String s = nodeName.substring(cluster.length() + 1);
s = s.substring(0, s.indexOf("/"));
@@ -54,6 +55,14 @@ public class LoadBalancer {
throw new IllegalArgumentException(err, e);
}
}
+ private int getCachedIndex(String nodeName) {
+ Integer index = cachedIndex.get(nodeName);
+ if (index == null) {
+ index = getIndex(nodeName);
+ cachedIndex.put(nodeName, index);
+ }
+ return index;
+ }
/**
* The load balancing operation: Returns a node choice from the given choices,
@@ -68,24 +77,24 @@ public class LoadBalancer {
double weightSum = 0.0;
Node selectedNode = null;
- double position = safePosition.get();
- for (Mirror.Entry entry : choices) {
- NodeMetrics nodeMetrics = getNodeMetrics(entry);
+ synchronized (this) {
+ for (Mirror.Entry entry : choices) {
+ NodeMetrics nodeMetrics = getNodeMetrics(entry);
- weightSum += nodeMetrics.weight;
+ weightSum += nodeMetrics.weight;
- if (weightSum > position) {
- selectedNode = new Node(entry, nodeMetrics);
- break;
+ if (weightSum > position) {
+ selectedNode = new Node(entry, nodeMetrics);
+ break;
+ }
}
+ if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason)
+ position -= weightSum;
+ selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0)));
+ }
+ position += 1.0;
+ selectedNode.metrics.sent++;
}
- if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason)
- position -= weightSum;
- selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0)));
- }
- position += 1.0;
- safePosition.set(position);
- selectedNode.metrics.sent.incrementAndGet();
return selectedNode;
}
@@ -94,7 +103,7 @@ public class LoadBalancer {
* If there is no entry at the given index it is created by this call.
*/
private NodeMetrics getNodeMetrics(Mirror.Entry entry) {
- int index = getIndex(entry.getName());
+ int index = getCachedIndex(entry.getName());
// expand node array as needed
while (nodeWeights.size() < (index + 1))
nodeWeights.add(null);
@@ -122,14 +131,16 @@ public class LoadBalancer {
public void received(Node node, boolean busy) {
if (busy) {
- double wantWeight = node.metrics.weight - 0.01;
- if (wantWeight < 1.0) {
- increaseWeights();
- node.metrics.weight = 1.0;
- } else {
- node.metrics.weight = wantWeight;
+ synchronized (this) {
+ double wantWeight = node.metrics.weight - 0.01;
+ if (wantWeight < 1.0) {
+ increaseWeights();
+ node.metrics.weight = 1.0;
+ } else {
+ node.metrics.weight = wantWeight;
+ }
+ node.metrics.busy++;
}
- node.metrics.busy.incrementAndGet();
}
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/LoadBalancerTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java
index 51dd1ac12b8..dc96fcaf45d 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/LoadBalancerTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerTestCase.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol.test;
+package com.yahoo.documentapi.messagebus.protocol;
-import com.yahoo.documentapi.messagebus.protocol.LoadBalancer;
import com.yahoo.jrt.slobrok.api.Mirror;
import org.junit.Test;
@@ -54,13 +53,13 @@ public class LoadBalancerTestCase {
assertEquals("foo/" + (i % 3) + "/default" , node.entry.getName());
}
- assertEquals(33, weights.get(0).sent.intValue());
- assertEquals(33, weights.get(1).sent.intValue());
- assertEquals(33, weights.get(2).sent.intValue());
+ assertEquals(33, weights.get(0).sent);
+ assertEquals(33, weights.get(1).sent);
+ assertEquals(33, weights.get(2).sent);
- weights.get(0).sent.set(0);
- weights.get(1).sent.set(0);
- weights.get(2).sent.set(0);
+ weights.get(0).sent = 0;
+ weights.get(1).sent = 0;
+ weights.get(2).sent = 0;
}
{