diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-08-17 16:04:00 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-08-17 16:04:00 +0200 |
commit | 9c6e5f514757a5463c9c3012462db094b84bbb65 (patch) | |
tree | d8010438827c2e3f7fc3e4fce2ce1e49bd5a4911 /container-search | |
parent | f32d81fd8d918cb2c1d9e97fa22b7649a7dd786c (diff) |
Monitor coverage status for search clusters
Diffstat (limited to 'container-search')
3 files changed, 74 insertions, 7 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 2ccb57c6e83..520903f2210 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -35,7 +35,7 @@ public class ClusterMonitor<T> { private volatile boolean shutdown = false; /** A map from Node to corresponding MonitoredNode */ - private Map<T, BaseNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); + private final Map<T, BaseNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); /** @deprecated use the constructor with just the first argument instead */ @Deprecated @@ -103,6 +103,7 @@ public class ClusterMonitor<T> { BaseNodeMonitor<T> monitor= i.next(); nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded } + nodeManager.pingIterationCompleted(); } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java index 25582e43f5e..0e33d5307c2 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java @@ -22,5 +22,8 @@ public interface NodeManager<T> { * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded */ void ping(T node, Executor executor); + + /** Called right after a ping has been issued to each node. This default implementation does nothing. */ + default void pingIterationCompleted() {} } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index c6581b14942..902d075aabc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -42,7 +42,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ - private double minActivedocsCoverage; + private double minActivedocsCoveragePercentage; private final int size; private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; @@ -56,7 +56,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { } public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool) { - this.minActivedocsCoverage = minActivedocsCoverage; + this.minActivedocsCoveragePercentage = minActivedocsCoverage; size = nodes.size(); this.fs4ResourcePool = fs4ResourcePool; @@ -120,6 +120,28 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { else clusterMonitor.responded(node); } + + /** + * Update statistics after a round of issuing pings. + * Note that this doesn't wait for pings to return, so it will typically accumulate data from + * last rounds pinging, or potentially (although unlikely) some combination of new and old data. + */ + @Override + public void pingIterationCompleted() { + // Update active documents per group and use it to decide if the group should be active + for (Group group : groups.values()) + group.aggregateActiveDocuments(); + for (Group currentGroup : groups.values()) { + long sumOfAactiveDocumentsInOtherGroups = 0; + for (Group otherGroup : groups.values()) + if ( otherGroup != currentGroup) + sumOfAactiveDocumentsInOtherGroups += otherGroup.getActiveDocuments(); + long averageDocumentsInOtherGroups = sumOfAactiveDocumentsInOtherGroups / (groups.size() - 1); + currentGroup.setHasSufficientCoverage( + 100 * currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage); + } + + } private Pong getPong(FutureTask<Pong> futurePong, Node node) { try { @@ -147,7 +169,8 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { try { Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString()); - // TODO: Update active docs + if (pong.activeDocuments().isPresent()) + node.setActiveDocuments(pong.activeDocuments().get()); return pong; } catch (RuntimeException e) { return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " @@ -157,12 +180,14 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { } + /** A group in a search cluster. This class is multithread safe. */ public static class Group { private final int id; private final ImmutableList<Node> nodes; - private AtomicLong activeDocs = new AtomicLong(0); + private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true); + private final AtomicLong activeDocuments = new AtomicLong(0); public Group(int id, List<Node> nodes) { this.id = id; @@ -175,11 +200,38 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { /** Returns the nodes in this group as an immutable list */ public ImmutableList<Node> nodes() { return nodes; } + /** + * Returns whether this group has sufficient active documents + * (compared to other groups) that is should receive traffic + */ + public boolean hasSufficientCoverage() { + return hasSufficientCoverage.get(); + } + + void setHasSufficientCoverage(boolean sufficientCoverage) { + hasSufficientCoverage.lazySet(sufficientCoverage); + } + + + void aggregateActiveDocuments() { + long activeDocumentsInGroup = 0; + for (Node node : nodes) + activeDocumentsInGroup += node.getActiveDocuments(); + activeDocuments.set(activeDocumentsInGroup); + + } + + /** Returns the active documents on this node. If unknown, 0 is returned. */ + long getActiveDocuments() { + return this.activeDocuments.get(); + } + @Override public String toString() { return "search group " + id; } } + /** A node in a search cluster. This class is multithread safe. */ public static class Node { private final String hostname; @@ -187,7 +239,8 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private final int group; private final AtomicBoolean working = new AtomicBoolean(true); - + private final AtomicLong activeDocuments = new AtomicLong(0); + public Node(String hostname, int port, int group) { this.hostname = hostname; this.port = port; @@ -200,13 +253,23 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { /** Returns the id of this group this node belongs to */ public int group() { return group; } - private void setWorking(boolean working) { + void setWorking(boolean working) { this.working.lazySet(working); } /** Returns whether this node is currently responding to requests */ public boolean isWorking() { return working.get(); } + /** Updates the active documents on this node */ + void setActiveDocuments(long activeDocuments) { + this.activeDocuments.set(activeDocuments); + } + + /** Returns the active documents on this node. If unknown, 0 is returned. */ + public long getActiveDocuments() { + return this.activeDocuments.get(); + } + @Override public int hashCode() { return Objects.hash(hostname, port); } |