summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java54
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java21
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java75
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java73
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java42
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java (renamed from container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java)221
7 files changed, 281 insertions, 221 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 286eee004c5..0dd682dee0e 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -11,6 +11,9 @@ import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
+import com.yahoo.search.dispatch.searchcluster.Group;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Arrays;
@@ -33,6 +36,8 @@ import java.util.Set;
* @author ollvir
*/
public class Dispatcher extends AbstractComponent {
+ private static final int MAX_GROUP_SELECTION_ATTEMPTS = 3;
+
/** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */
private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");
@@ -66,11 +71,6 @@ public class Dispatcher extends AbstractComponent {
rpcResourcePool.release();
}
- @FunctionalInterface
- private interface SearchInvokerSupplier {
- Optional<SearchInvoker> supply(Query query, List<SearchCluster.Node> nodes);
- }
-
public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb,
FS4InvokerFactory fs4InvokerFactory) {
Optional<FillInvoker> rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb);
@@ -102,6 +102,11 @@ public class Dispatcher extends AbstractComponent {
return Optional.empty();
}
+ @FunctionalInterface
+ private interface SearchInvokerSupplier {
+ Optional<SearchInvoker> supply(Query query, int groupId, List<Node> nodes, boolean acceptIncompleteCoverage);
+ }
+
// build invoker based on searchpath
private Optional<SearchInvoker> getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) {
String searchPath = query.getModel().getSearchPath();
@@ -109,11 +114,11 @@ public class Dispatcher extends AbstractComponent {
return Optional.empty();
}
try {
- List<SearchCluster.Node> nodes = SearchPath.selectNodes(searchPath, searchCluster);
+ List<Node> nodes = SearchPath.selectNodes(searchPath, searchCluster);
if (nodes.isEmpty()) {
return Optional.empty();
} else {
- return invokerFactory.supply(query, nodes);
+ return invokerFactory.supply(query, -1, nodes, true);
}
} catch (InvalidSearchPathException e) {
return Optional.of(new SearchErrorInvoker(e.getMessage()));
@@ -121,41 +126,34 @@ public class Dispatcher extends AbstractComponent {
}
private Optional<SearchInvoker> getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) {
- Optional<SearchCluster.Node> directNode = searchCluster.directDispatchTarget();
+ Optional<Node> directNode = searchCluster.directDispatchTarget();
if (directNode.isPresent()) {
- SearchCluster.Node node = directNode.get();
+ Node node = directNode.get();
query.trace(false, 2, "Dispatching directly to ", node);
- return invokerFactory.supply(query, Arrays.asList(node));
+ return invokerFactory.supply(query, -1, Arrays.asList(node), true);
}
- Set<Integer> tried = null;
- int max = searchCluster.groups().size();
- for (int attempt = 0; attempt < max; attempt++) {
- Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query);
- if (! groupInCluster.isPresent()) {
+ int max = Integer.min(searchCluster.orderedGroups().size(), MAX_GROUP_SELECTION_ATTEMPTS);
+ Set<Integer> rejected = null;
+ for (int i = 0; i < max; i++) {
+ Optional<Group> groupInCluster = loadBalancer.takeGroupForQuery(query, rejected);
+ if (!groupInCluster.isPresent()) {
// No groups available
break;
}
- SearchCluster.Group group = groupInCluster.get();
- if (tried != null && tried.contains(group.id())) {
- // bail out: LB is offering a previously discarded group
- loadBalancer.releaseGroup(group);
- break;
- }
-
- Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.nodes());
+ Group group = groupInCluster.get();
+ boolean acceptIncompleteCoverage = (i == max - 1);
+ Optional<SearchInvoker> invoker = invokerFactory.supply(query, group.id(), group.nodes(), acceptIncompleteCoverage);
if (invoker.isPresent()) {
query.trace(false, 2, "Dispatching internally to ", group);
invoker.get().teardown(() -> loadBalancer.releaseGroup(group));
return invoker;
} else {
- // invoker could not be produced (likely connectivity issue)
- searchCluster.groupConnectionFailure(group);
loadBalancer.releaseGroup(group);
- if (tried == null) {
- tried = new HashSet<>();
+ if (rejected == null) {
+ rejected = new HashSet<>();
}
- tried.add(group.id());
+ rejected.add(group.id());
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
index 64e38a488ab..22573fac2d9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -2,12 +2,14 @@
package com.yahoo.search.dispatch;
import com.yahoo.search.Query;
-import com.yahoo.search.dispatch.SearchCluster.Group;
+import com.yahoo.search.dispatch.searchcluster.Group;
+import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -47,18 +49,19 @@ public class LoadBalancer {
* {@link #releaseGroup} symmetrically for each taken allocation.
*
* @param query the query for which this allocation is made
+ * @param rejectedGroups if not null, the load balancer will only return groups with IDs not in the set
* @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used
*/
- public Optional<Group> takeGroupForQuery(Query query) {
+ public Optional<Group> takeGroupForQuery(Query query, Set<Integer> rejectedGroups) {
if (scoreboard == null) {
return Optional.empty();
}
- return allocateNextGroup();
+ return allocateNextGroup(rejectedGroups);
}
/**
- * Release an allocation given by {@link #takeGroupForQuery(Query)}. The release must be done exactly once for each allocation.
+ * Release an allocation given by {@link #takeGroupForQuery}. The release must be done exactly once for each allocation.
*
* @param group
* previously allocated group
@@ -74,7 +77,7 @@ public class LoadBalancer {
}
}
- private Optional<Group> allocateNextGroup() {
+ private Optional<Group> allocateNextGroup(Set<Integer> rejectedGroups) {
synchronized (this) {
GroupSchedule bestSchedule = null;
int bestIndex = needle;
@@ -82,9 +85,11 @@ public class LoadBalancer {
int index = needle;
for (int i = 0; i < scoreboard.size(); i++) {
GroupSchedule sched = scoreboard.get(index);
- if (sched.isPreferredOver(bestSchedule)) {
- bestSchedule = sched;
- bestIndex = index;
+ if (rejectedGroups == null || !rejectedGroups.contains(sched.group.id())) {
+ if (sched.isPreferredOver(bestSchedule)) {
+ bestSchedule = sched;
+ bestIndex = index;
+ }
}
index = nextScoreboardIndex(index);
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java
index 57f06225d27..6800a80b78f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java
@@ -3,7 +3,9 @@ package com.yahoo.search.dispatch;
import com.google.common.collect.ImmutableCollection;
import com.yahoo.collections.Pair;
-import com.yahoo.search.dispatch.SearchCluster.Group;
+import com.yahoo.search.dispatch.searchcluster.Group;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import java.util.ArrayList;
import java.util.Collection;
@@ -37,7 +39,7 @@ public class SearchPath {
* @return list of nodes chosen with the search path, or an empty list in which
* case some other node selection logic should be used
*/
- public static List<SearchCluster.Node> selectNodes(String searchPath, SearchCluster cluster) {
+ public static List<Node> selectNodes(String searchPath, SearchCluster cluster) {
Optional<SearchPath> sp = SearchPath.fromString(searchPath);
if (sp.isPresent()) {
return sp.get().mapToNodes(cluster);
@@ -46,7 +48,7 @@ public class SearchPath {
}
}
- public static Optional<SearchPath> fromString(String path) {
+ static Optional<SearchPath> fromString(String path) {
if (path == null || path.isEmpty()) {
return Optional.empty();
}
@@ -73,23 +75,23 @@ public class SearchPath {
this.group = group;
}
- private List<SearchCluster.Node> mapToNodes(SearchCluster cluster) {
+ private List<Node> mapToNodes(SearchCluster cluster) {
if (cluster.groups().isEmpty()) {
return Collections.emptyList();
}
- SearchCluster.Group selectedGroup = selectGroup(cluster);
+ Group selectedGroup = selectGroup(cluster);
if (nodes.isEmpty()) {
return selectedGroup.nodes();
}
- List<SearchCluster.Node> groupNodes = selectedGroup.nodes();
+ List<Node> groupNodes = selectedGroup.nodes();
Set<Integer> wanted = new HashSet<>();
int max = groupNodes.size();
for (NodeSelection node : nodes) {
wanted.addAll(node.matches(max));
}
- List<SearchCluster.Node> ret = new ArrayList<>();
+ List<Node> ret = new ArrayList<>();
for (int idx : wanted) {
ret.add(groupNodes.get(idx));
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
new file mode 100644
index 00000000000..01cbc5cd307
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -0,0 +1,75 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.searchcluster;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A group in a search cluster. This class is multithread safe.
+ *
+ * @author bratseth
+ * @author ollivir
+ */
+public class Group {
+
+ private final int id;
+ private final ImmutableList<Node> nodes;
+
+ private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true);
+ private final AtomicLong activeDocuments = new AtomicLong(0);
+
+ public Group(int id, List<Node> nodes) {
+ this.id = id;
+ this.nodes = ImmutableList.copyOf(nodes);
+ }
+
+ /** Returns the unique identity of this group */
+ public int id() { return id; }
+
+ /** Returns the nodes in this group as an immutable list */
+ public ImmutableList<Node> nodes() { return nodes; }
+
+ /**
+ * Returns whether this group has sufficient active documents
+ * (compared to other groups) that is should receive traffic
+ */
+ public boolean hasSufficientCoverage() {
+ return hasSufficientCoverage.get();
+ }
+
+ void setHasSufficientCoverage(boolean sufficientCoverage) {
+ hasSufficientCoverage.lazySet(sufficientCoverage);
+ }
+
+ void aggregateActiveDocuments() {
+ long activeDocumentsInGroup = 0;
+ for (Node node : nodes) {
+ if (node.isWorking()) {
+ activeDocumentsInGroup += node.getActiveDocuments();
+ }
+ }
+ activeDocuments.set(activeDocumentsInGroup);
+
+ }
+
+ /** Returns the active documents on this node. If unknown, 0 is returned. */
+ long getActiveDocuments() {
+ return this.activeDocuments.get();
+ }
+
+ @Override
+ public String toString() { return "search group " + id; }
+
+ @Override
+ public int hashCode() { return id; }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) return true;
+ if (!(other instanceof Group)) return false;
+ return ((Group) other).id == this.id;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
new file mode 100644
index 00000000000..98deb9e3199
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
@@ -0,0 +1,73 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.searchcluster;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A node in a search cluster. This class is multithread safe.
+ *
+ * @author bratseth
+ * @author ollivir
+ */
+public class Node {
+
+ private final int key;
+ private final String hostname;
+ private final int fs4port;
+ final int group;
+
+ private final AtomicBoolean working = new AtomicBoolean(true);
+ private final AtomicLong activeDocuments = new AtomicLong(0);
+
+ public Node(int key, String hostname, int fs4port, int group) {
+ this.key = key;
+ this.hostname = hostname;
+ this.fs4port = fs4port;
+ this.group = group;
+ }
+
+ /** Returns the unique and stable distribution key of this node */
+ public int key() { return key; }
+
+ public String hostname() { return hostname; }
+
+ public int fs4port() { return fs4port; }
+
+ /** Returns the id of this group this node belongs to */
+ public int group() { return group; }
+
+ public void setWorking(boolean working) {
+ this.working.lazySet(working);
+ }
+
+ /** Returns whether this node is currently responding to requests */
+ public boolean isWorking() { return working.get(); }
+
+ /** Updates the active documents on this node */
+ void setActiveDocuments(long activeDocuments) {
+ this.activeDocuments.set(activeDocuments);
+ }
+
+ /** Returns the active documents on this node. If unknown, 0 is returned. */
+ public long getActiveDocuments() {
+ return this.activeDocuments.get();
+ }
+
+ @Override
+ public int hashCode() { return Objects.hash(hostname, fs4port); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if ( ! (o instanceof Node)) return false;
+ Node other = (Node)o;
+ if ( ! Objects.equals(this.hostname, other.hostname)) return false;
+ if ( ! Objects.equals(this.fs4port, other.fs4port)) return false;
+ return true;
+ }
+
+ @Override
+ public String toString() { return "search node " + hostname + ":" + fs4port + " in group " + group; }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java
new file mode 100644
index 00000000000..7c7a9cb1d1c
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java
@@ -0,0 +1,42 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.searchcluster;
+
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.Pong;
+import com.yahoo.prelude.fastsearch.FS4ResourcePool;
+import com.yahoo.prelude.fastsearch.FastSearcher;
+import com.yahoo.search.cluster.ClusterMonitor;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.yolean.Exceptions;
+
+import java.util.concurrent.Callable;
+
+/**
+ * @author bratseth
+ * @author ollivir
+ */
+class Pinger implements Callable<Pong> {
+ private final Node node;
+ private final ClusterMonitor<Node> clusterMonitor;
+ private final FS4ResourcePool fs4ResourcePool;
+
+ public Pinger(Node node, ClusterMonitor<Node> clusterMonitor, FS4ResourcePool fs4ResourcePool) {
+ this.node = node;
+ this.clusterMonitor = clusterMonitor;
+ this.fs4ResourcePool = fs4ResourcePool;
+ }
+
+ public Pong call() {
+ try {
+ Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()),
+ fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString());
+ if (pong.activeDocuments().isPresent())
+ node.setActiveDocuments(pong.activeDocuments().get());
+ return pong;
+ } catch (RuntimeException e) {
+ return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": "
+ + Exceptions.toMessageString(e)));
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index e26dd5648eb..b0e63d20931 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -1,5 +1,5 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.searchcluster;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
@@ -11,27 +11,18 @@ import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
-
-// Only needed until query requests are moved to rpc
-import com.yahoo.prelude.Ping;
-import com.yahoo.prelude.fastsearch.FastSearcher;
-import com.yahoo.yolean.Exceptions;
import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -41,7 +32,7 @@ import java.util.stream.Collectors;
*
* @author bratseth
*/
-public class SearchCluster implements NodeManager<SearchCluster.Node> {
+public class SearchCluster implements NodeManager<Node> {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
@@ -128,8 +119,8 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
// Only use direct dispatch if we have exactly 1 search node on the same machine:
if (localSearchNodes.size() != 1) return Optional.empty();
- SearchCluster.Node localSearchNode = localSearchNodes.iterator().next();
- SearchCluster.Group localSearchGroup = groups.get(localSearchNode.group());
+ Node localSearchNode = localSearchNodes.iterator().next();
+ Group localSearchGroup = groups.get(localSearchNode.group());
// Only use direct dispatch if the local search node has the entire corpus
if (localSearchGroup.nodes().size() != 1) return Optional.empty();
@@ -188,7 +179,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
if ( ! directDispatchTarget.isPresent()) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
- SearchCluster.Group localSearchGroup = groups.get(directDispatchTarget.get().group());
+ Group localSearchGroup = groups.get(directDispatchTarget.get().group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is up
@@ -216,10 +207,6 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
vipStatus.removeFromRotation(this);
}
- public void groupConnectionFailure(Group group) {
- group.setHasSufficientCoverage(false); // will be reset after next ping iteration
- }
-
private void updateSufficientCoverage(Group group, boolean sufficientCoverage) {
// update VIP status if we direct dispatch to this group and coverage status changed
if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) {
@@ -245,7 +232,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
/** Used by the cluster monitor to manage node status */
@Override
public void ping(Node node, Executor executor) {
- Pinger pinger = new Pinger(node);
+ Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool);
FutureTask<Pong> futurePong = new FutureTask<>(pinger);
executor.execute(futurePong);
Pong pong = getPong(futurePong, node);
@@ -287,29 +274,34 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
Group group = orderedGroups.get(i);
long activeDocuments = activeDocumentsInGroup[i];
long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1);
- boolean sufficientCoverage = true;
-
- if (averageDocumentsInOtherGroups > 0) {
- double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups;
- sufficientCoverage = coverage >= minActivedocsCoveragePercentage;
- }
- if (sufficientCoverage) {
- sufficientCoverage = isNodeCoverageSufficient(group);
- }
+ boolean sufficientCoverage = isGroupCoverageSufficient(group.nodes(), activeDocuments, averageDocumentsInOtherGroups);
updateSufficientCoverage(group, sufficientCoverage);
}
}
- private boolean isNodeCoverageSufficient(Group group) {
+ private boolean isGroupCoverageSufficient(List<Node> nodes, long activeDocuments, long averageDocumentsInOtherGroups) {
+ boolean sufficientCoverage = true;
+
+ if (averageDocumentsInOtherGroups > 0) {
+ double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups;
+ sufficientCoverage = coverage >= minActivedocsCoveragePercentage;
+ }
+ if (sufficientCoverage) {
+ sufficientCoverage = isGroupNodeCoverageSufficient(nodes);
+ }
+ return sufficientCoverage;
+ }
+
+ private boolean isGroupNodeCoverageSufficient(List<Node> nodes) {
int nodesUp = 0;
- for (Node node : group.nodes()) {
+ for (Node node : nodes) {
if (node.isWorking()) {
nodesUp++;
}
}
- int nodes = group.nodes().size();
- int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) nodes * (100.0 - minGroupCoverage)) / 100.0);
- return nodesUp + nodesAllowedDown >= nodes;
+ int numNodes = nodes.size();
+ int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) numNodes * (100.0 - minGroupCoverage)) / 100.0);
+ return nodesUp + nodesAllowedDown >= numNodes;
}
private Pong getPong(FutureTask<Pong> futurePong, Node node) {
@@ -326,153 +318,26 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
}
}
- private class Pinger implements Callable<Pong> {
-
- private final Node node;
-
- public Pinger(Node node) {
- this.node = node;
- }
-
- public Pong call() {
- try {
- Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()),
- fs4ResourcePool.getBackend(node.hostname(), node.fs4port()), node.toString());
- if (pong.activeDocuments().isPresent())
- node.setActiveDocuments(pong.activeDocuments().get());
- return pong;
- } catch (RuntimeException e) {
- return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": "
- + Exceptions.toMessageString(e)));
- }
- }
-
- }
-
- /** A group in a search cluster. This class is multithread safe. */
- public static class Group {
-
- private final int id;
- private final ImmutableList<Node> nodes;
-
- private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true);
- private final AtomicLong activeDocuments = new AtomicLong(0);
-
- public Group(int id, List<Node> nodes) {
- this.id = id;
- this.nodes = ImmutableList.copyOf(nodes);
- }
-
- /** Returns the unique identity of this group */
- public int id() { return id; }
-
- /** Returns the nodes in this group as an immutable list */
- public ImmutableList<Node> nodes() { return nodes; }
-
- /**
- * Returns whether this group has sufficient active documents
- * (compared to other groups) that is should receive traffic
- */
- public boolean hasSufficientCoverage() {
- return hasSufficientCoverage.get();
- }
-
- void setHasSufficientCoverage(boolean sufficientCoverage) {
- hasSufficientCoverage.lazySet(sufficientCoverage);
+ /**
+ * Calculate whether a subset of nodes in a group has enough coverage
+ */
+ public boolean isPartialGroupCoverageSufficient(int groupId, List<Node> nodes) {
+ if (orderedGroups.size() == 1) {
+ return true;
}
-
- void aggregateActiveDocuments() {
- long activeDocumentsInGroup = 0;
- for (Node node : nodes) {
- if (node.isWorking()) {
- activeDocumentsInGroup += node.getActiveDocuments();
- }
+ long sumOfActiveDocuments = 0;
+ int otherGroups = 0;
+ for (Group g : orderedGroups) {
+ if (g.id() != groupId) {
+ sumOfActiveDocuments += g.getActiveDocuments();
+ otherGroups++;
}
- activeDocuments.set(activeDocumentsInGroup);
-
}
-
- /** Returns the active documents on this node. If unknown, 0 is returned. */
- long getActiveDocuments() {
- return this.activeDocuments.get();
+ long activeDocuments = 0;
+ for (Node n : nodes) {
+ activeDocuments += n.getActiveDocuments();
}
-
- @Override
- public String toString() { return "search group " + id; }
-
- @Override
- public int hashCode() { return id; }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) return true;
- if (!(other instanceof Group)) return false;
- return ((Group) other).id == this.id;
- }
-
+ long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups;
+ return isGroupCoverageSufficient(nodes, activeDocuments, averageDocumentsInOtherGroups);
}
-
- /** A node in a search cluster. This class is multithread safe. */
- public static class Node {
-
- private final int key;
- private final String hostname;
- private final int fs4port;
- private final int group;
-
- private final AtomicBoolean working = new AtomicBoolean(true);
- private final AtomicLong activeDocuments = new AtomicLong(0);
-
- public Node(int key, String hostname, int fs4port, int group) {
- this.key = key;
- this.hostname = hostname;
- this.fs4port = fs4port;
- this.group = group;
- }
-
- /** Returns the unique and stable distribution key of this node */
- public int key() { return key; }
-
- public String hostname() { return hostname; }
-
- public int fs4port() { return fs4port; }
-
- /** Returns the id of this group this node belongs to */
- public int group() { return group; }
-
- void setWorking(boolean working) {
- this.working.lazySet(working);
- }
-
- /** Returns whether this node is currently responding to requests */
- public boolean isWorking() { return working.get(); }
-
- /** Updates the active documents on this node */
- void setActiveDocuments(long activeDocuments) {
- this.activeDocuments.set(activeDocuments);
- }
-
- /** Returns the active documents on this node. If unknown, 0 is returned. */
- public long getActiveDocuments() {
- return this.activeDocuments.get();
- }
-
- @Override
- public int hashCode() { return Objects.hash(hostname, fs4port); }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) return true;
- if ( ! (o instanceof Node)) return false;
- Node other = (Node)o;
- if ( ! Objects.equals(this.hostname, other.hostname)) return false;
- if ( ! Objects.equals(this.fs4port, other.fs4port)) return false;
- return true;
- }
-
- @Override
- public String toString() { return "search node " + hostname + ":" + fs4port + " in group " + group; }
-
- }
-
}