diff options
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java | 107 |
1 files changed, 24 insertions, 83 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java index a9b0632a767..c464bed5b87 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java @@ -1,4 +1,3 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; import com.yahoo.jrt.slobrok.api.Mirror; @@ -7,20 +6,24 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; -/** - * Load balances over a set of nodes based on statistics gathered from those nodes. - * - * @author thomasg - */ -public class LoadBalancer { - +abstract class LoadBalancer { static class NodeMetrics { - long sent = 0; - long busy = 0; - double weight = 1.0; + private AtomicLong sent = new AtomicLong(0); + private AtomicLong received = new AtomicLong(0); + private AtomicLong busy = new AtomicLong(0); + long pending() { return sent.get() - received.get(); } + void incSend() { sent.incrementAndGet(); } + void incReceived() { received.incrementAndGet(); } + void incBusy() { busy.incrementAndGet(); } + long sent() { return sent.get(); } + void reset() { + sent.set(0); + received.set(0); + busy.set(0); + } } - static class Node { Node(Mirror.Entry e, NodeMetrics m) { entry = e; metrics = m; } @@ -28,21 +31,17 @@ public class LoadBalancer { NodeMetrics metrics; } + private final Map<String, Integer> cachedIndex = new HashMap<>(); /** Statistics on each node we are load balancing over. Populated lazily. */ private final List<NodeMetrics> nodeWeights = new ArrayList<>(); - private final Map<String, Integer> cachedIndex = new HashMap<>(); - private final String cluster; - private double position = 0.0; public LoadBalancer(String cluster) { this.cluster = cluster; } - - public List<NodeMetrics> getNodeWeights() { + List<NodeMetrics> getNodeWeights() { return nodeWeights; } - /** Returns the index from a node name string */ int getIndex(String nodeName) { try { @@ -55,49 +54,14 @@ public class LoadBalancer { throw new IllegalArgumentException(err, e); } } - private int getCachedIndex(String nodeName) { + int getCachedIndex(String nodeName) { return cachedIndex.computeIfAbsent(nodeName, key -> getIndex(key)); } - - /** - * The load balancing operation: Returns a node choice from the given choices, - * based on previously gathered statistics on the nodes, and a running "position" - * which is increased by 1 on each call to this. - * - * @param choices the node choices, represented as Slobrok entries - * @return the chosen node, or null only if the given choices were zero - */ - public Node getRecipient(List<Mirror.Entry> choices) { - if (choices.isEmpty()) return null; - - double weightSum = 0.0; - Node selectedNode = null; - synchronized (this) { - for (Mirror.Entry entry : choices) { - NodeMetrics nodeMetrics = getNodeMetrics(entry); - - weightSum += nodeMetrics.weight; - - if (weightSum > position) { - selectedNode = new Node(entry, nodeMetrics); - break; - } - } - if (selectedNode == null) { // Position>sum of all weights: Wrap around (but keep the remainder for some reason) - position -= weightSum; - selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0))); - } - position += 1.0; - selectedNode.metrics.sent++; - } - return selectedNode; - } - /** * Returns the node metrics at a given index. * If there is no entry at the given index it is created by this call. */ - private NodeMetrics getNodeMetrics(Mirror.Entry entry) { + protected final synchronized NodeMetrics getNodeMetrics(Mirror.Entry entry) { int index = getCachedIndex(entry.getName()); // expand node array as needed while (nodeWeights.size() < (index + 1)) @@ -105,38 +69,15 @@ public class LoadBalancer { NodeMetrics nodeMetrics = nodeWeights.get(index); if (nodeMetrics == null) { // initialize statistics for this node - nodeMetrics = new NodeMetrics(); + nodeMetrics = createNodeMetrics(); nodeWeights.set(index, nodeMetrics); } return nodeMetrics; } - /** Scale weights such that ratios are preserved */ - private void increaseWeights() { - for (NodeMetrics n : nodeWeights) { - if (n == null) continue; - double want = n.weight * 1.01010101010101010101; - if (want >= 1.0) { - n.weight = want; - } else { - n.weight = 1.0; - } - } + protected NodeMetrics createNodeMetrics() { + return new NodeMetrics(); } - - public void received(Node node, boolean busy) { - if (busy) { - synchronized (this) { - double wantWeight = node.metrics.weight - 0.01; - if (wantWeight < 1.0) { - increaseWeights(); - node.metrics.weight = 1.0; - } else { - node.metrics.weight = wantWeight; - } - node.metrics.busy++; - } - } - } - + abstract Node getRecipient(List<Mirror.Entry> choices); + abstract void received(Node node, boolean busy); } |