diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-10-24 09:43:47 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-10-24 09:43:47 +0200 |
commit | 03f93552fc3ee9e2a009e837addc2c82e075e4f1 (patch) | |
tree | 3a0cb5ed4fe78436581b43636365760ab6d25a7d /container-search/src/main | |
parent | edf46c3e106da961c522add0691dfa090d8637a1 (diff) |
Pull more configuration parameters from config model to internal dispatcher config
Diffstat (limited to 'container-search/src/main')
3 files changed, 67 insertions, 28 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 2f67600522f..4638583a3f5 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -44,14 +44,15 @@ public class Dispatcher extends AbstractComponent { public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus); - this.loadBalancer = new LoadBalancer(searchCluster); + this.loadBalancer = new LoadBalancer(searchCluster, + dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); this.rpcResourcePool = new RpcResourcePool(dispatchConfig); } /** For testing */ public Dispatcher(Map<Integer, Client.NodeConnection> nodeConnections, Client client) { this.searchCluster = null; - this.loadBalancer = new LoadBalancer(searchCluster); + this.loadBalancer = new LoadBalancer(searchCluster, true); this.rpcResourcePool = new RpcResourcePool(client, nodeConnections); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 4962746cdea..64e38a488ab 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -5,6 +5,7 @@ import com.yahoo.search.Query; import com.yahoo.search.dispatch.SearchCluster.Group; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.logging.Level; @@ -18,14 +19,13 @@ import java.util.logging.Logger; */ public class LoadBalancer { // The implementation here is a simplistic least queries in flight + round-robin load balancer - // TODO: consider the options in com.yahoo.vespa.model.content.TuningDispatch private static final Logger log = Logger.getLogger(LoadBalancer.class.getName()); private final List<GroupSchedule> scoreboard; private int needle = 0; - public LoadBalancer(SearchCluster searchCluster) { + public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) { if (searchCluster == null) { this.scoreboard = null; return; @@ -35,6 +35,11 @@ public class LoadBalancer { for (Group group : searchCluster.orderedGroups()) { scoreboard.add(new GroupSchedule(group)); } + + if(! roundRobin) { + // TODO - More randomness could be desirable + Collections.shuffle(scoreboard); + } } /** diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index f806d4e685a..5dc63bc14d6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -46,7 +46,9 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ - private double minActivedocsCoveragePercentage; + public final double minActivedocsCoveragePercentage; + public final double minGroupCoverage; + public final int maxNodesDownPerGroup; private final int size; private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; @@ -67,15 +69,16 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { // Only needed until query requests are moved to rpc private final FS4ResourcePool fs4ResourcePool; - public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, - int containerClusterSize, VipStatus vipStatus) { - this(dispatchConfig.minActivedocsPercentage(), toNodes(dispatchConfig), fs4ResourcePool, - containerClusterSize, vipStatus); + public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { + this(dispatchConfig.minActivedocsPercentage(), dispatchConfig.minGroupCoverage(), dispatchConfig.maxNodesDownPerGroup(), + toNodes(dispatchConfig), fs4ResourcePool, containerClusterSize, vipStatus); } - public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool, - int containerClusterSize, VipStatus vipStatus) { + public SearchCluster(double minActivedocsCoverage, double minGroupCoverage, int maxNodesDownPerGroup, List<Node> nodes, FS4ResourcePool fs4ResourcePool, + int containerClusterSize, VipStatus vipStatus) { this.minActivedocsCoveragePercentage = minActivedocsCoverage; + this.minGroupCoverage = minGroupCoverage; + this.maxNodesDownPerGroup = maxNodesDownPerGroup; this.size = nodes.size(); this.fs4ResourcePool = fs4ResourcePool; this.vipStatus = vipStatus; @@ -257,27 +260,54 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { */ @Override public void pingIterationCompleted() { + int numGroups = orderedGroups.size(); + if (numGroups == 1) { + Group group = groups.values().iterator().next(); + group.aggregateActiveDocuments(); + updateSufficientCoverage(group, true); // by definition + return; + } + // Update active documents per group and use it to decide if the group should be active - for (Group group : groups.values()) + + long[] activeDocumentsInGroup = new long[numGroups]; + long sumOfActiveDocuments = 0; + for(int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); group.aggregateActiveDocuments(); - if (groups.size() == 1) { - updateSufficientCoverage(groups.values().iterator().next(), true); // by definition - } else { - for (Group currentGroup : groups.values()) { - long sumOfAactiveDocumentsInOtherGroups = 0; - for (Group otherGroup : groups.values()) - if (otherGroup != currentGroup) - sumOfAactiveDocumentsInOtherGroups += otherGroup.getActiveDocuments(); - long averageDocumentsInOtherGroups = sumOfAactiveDocumentsInOtherGroups / (groups.size() - 1); - if (averageDocumentsInOtherGroups == 0) - updateSufficientCoverage(currentGroup, true); // no information about any group; assume coverage - else - updateSufficientCoverage(currentGroup, - 100 * (double) currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage); + activeDocumentsInGroup[i] = group.getActiveDocuments(); + sumOfActiveDocuments += activeDocumentsInGroup[i]; + } + + for (int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); + long activeDocuments = activeDocumentsInGroup[i]; + long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); + boolean sufficientCoverage = true; + + if (averageDocumentsInOtherGroups > 0) { + double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; + sufficientCoverage = coverage >= minActivedocsCoveragePercentage; } + if (sufficientCoverage) { + sufficientCoverage = isNodeCoverageSufficient(group); + } + updateSufficientCoverage(group, sufficientCoverage); } } + private boolean isNodeCoverageSufficient(Group group) { + int nodesUp = 0; + for (Node node : group.nodes()) { + if (node.isWorking()) { + nodesUp++; + } + } + int nodes = group.nodes().size(); + int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) nodes * (100.0 - minGroupCoverage)) / 100.0); + return nodesUp + nodesAllowedDown >= nodes; + } + private Pong getPong(FutureTask<Pong> futurePong, Node node) { try { return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); @@ -349,8 +379,11 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { void aggregateActiveDocuments() { long activeDocumentsInGroup = 0; - for (Node node : nodes) - activeDocumentsInGroup += node.getActiveDocuments(); + for (Node node : nodes) { + if (node.isWorking()) { + activeDocumentsInGroup += node.getActiveDocuments(); + } + } activeDocuments.set(activeDocumentsInGroup); } |