summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-08-17 16:04:00 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-08-17 16:04:00 +0200
commit9c6e5f514757a5463c9c3012462db094b84bbb65 (patch)
treed8010438827c2e3f7fc3e4fce2ce1e49bd5a4911 /container-search
parentf32d81fd8d918cb2c1d9e97fa22b7649a7dd786c (diff)
Monitor coverage status for search clusters
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java3
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java3
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java75
3 files changed, 74 insertions, 7 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index 2ccb57c6e83..520903f2210 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -35,7 +35,7 @@ public class ClusterMonitor<T> {
private volatile boolean shutdown = false;
/** A map from Node to corresponding MonitoredNode */
- private Map<T, BaseNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
+ private final Map<T, BaseNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
/** @deprecated use the constructor with just the first argument instead */
@Deprecated
@@ -103,6 +103,7 @@ public class ClusterMonitor<T> {
BaseNodeMonitor<T> monitor= i.next();
nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded
}
+ nodeManager.pingIterationCompleted();
}
/** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
index 25582e43f5e..0e33d5307c2 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
@@ -22,5 +22,8 @@ public interface NodeManager<T> {
* This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded
*/
void ping(T node, Executor executor);
+
+ /** Called right after a ping has been issued to each node. This default implementation does nothing. */
+ default void pingIterationCompleted() {}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
index c6581b14942..902d075aabc 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
@@ -42,7 +42,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
/** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */
- private double minActivedocsCoverage;
+ private double minActivedocsCoveragePercentage;
private final int size;
private final ImmutableMap<Integer, Group> groups;
private final ImmutableMultimap<String, Node> nodesByHost;
@@ -56,7 +56,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
}
public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool) {
- this.minActivedocsCoverage = minActivedocsCoverage;
+ this.minActivedocsCoveragePercentage = minActivedocsCoverage;
size = nodes.size();
this.fs4ResourcePool = fs4ResourcePool;
@@ -120,6 +120,28 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
else
clusterMonitor.responded(node);
}
+
+ /**
+ * Update statistics after a round of issuing pings.
+ * Note that this doesn't wait for pings to return, so it will typically accumulate data from
+ * last rounds pinging, or potentially (although unlikely) some combination of new and old data.
+ */
+ @Override
+ public void pingIterationCompleted() {
+ // Update active documents per group and use it to decide if the group should be active
+ for (Group group : groups.values())
+ group.aggregateActiveDocuments();
+ for (Group currentGroup : groups.values()) {
+ long sumOfAactiveDocumentsInOtherGroups = 0;
+ for (Group otherGroup : groups.values())
+ if ( otherGroup != currentGroup)
+ sumOfAactiveDocumentsInOtherGroups += otherGroup.getActiveDocuments();
+ long averageDocumentsInOtherGroups = sumOfAactiveDocumentsInOtherGroups / (groups.size() - 1);
+ currentGroup.setHasSufficientCoverage(
+ 100 * currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage);
+ }
+
+ }
private Pong getPong(FutureTask<Pong> futurePong, Node node) {
try {
@@ -147,7 +169,8 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
try {
Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()),
fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString());
- // TODO: Update active docs
+ if (pong.activeDocuments().isPresent())
+ node.setActiveDocuments(pong.activeDocuments().get());
return pong;
} catch (RuntimeException e) {
return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": "
@@ -157,12 +180,14 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
}
+ /** A group in a search cluster. This class is multithread safe. */
public static class Group {
private final int id;
private final ImmutableList<Node> nodes;
- private AtomicLong activeDocs = new AtomicLong(0);
+ private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true);
+ private final AtomicLong activeDocuments = new AtomicLong(0);
public Group(int id, List<Node> nodes) {
this.id = id;
@@ -175,11 +200,38 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
/** Returns the nodes in this group as an immutable list */
public ImmutableList<Node> nodes() { return nodes; }
+ /**
+ * Returns whether this group has sufficient active documents
+ * (compared to other groups) that is should receive traffic
+ */
+ public boolean hasSufficientCoverage() {
+ return hasSufficientCoverage.get();
+ }
+
+ void setHasSufficientCoverage(boolean sufficientCoverage) {
+ hasSufficientCoverage.lazySet(sufficientCoverage);
+ }
+
+
+ void aggregateActiveDocuments() {
+ long activeDocumentsInGroup = 0;
+ for (Node node : nodes)
+ activeDocumentsInGroup += node.getActiveDocuments();
+ activeDocuments.set(activeDocumentsInGroup);
+
+ }
+
+ /** Returns the active documents on this node. If unknown, 0 is returned. */
+ long getActiveDocuments() {
+ return this.activeDocuments.get();
+ }
+
@Override
public String toString() { return "search group " + id; }
}
+ /** A node in a search cluster. This class is multithread safe. */
public static class Node {
private final String hostname;
@@ -187,7 +239,8 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
private final int group;
private final AtomicBoolean working = new AtomicBoolean(true);
-
+ private final AtomicLong activeDocuments = new AtomicLong(0);
+
public Node(String hostname, int port, int group) {
this.hostname = hostname;
this.port = port;
@@ -200,13 +253,23 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
/** Returns the id of this group this node belongs to */
public int group() { return group; }
- private void setWorking(boolean working) {
+ void setWorking(boolean working) {
this.working.lazySet(working);
}
/** Returns whether this node is currently responding to requests */
public boolean isWorking() { return working.get(); }
+ /** Updates the active documents on this node */
+ void setActiveDocuments(long activeDocuments) {
+ this.activeDocuments.set(activeDocuments);
+ }
+
+ /** Returns the active documents on this node. If unknown, 0 is returned. */
+ public long getActiveDocuments() {
+ return this.activeDocuments.get();
+ }
+
@Override
public int hashCode() { return Objects.hash(hostname, port); }