summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java134
1 files changed, 73 insertions, 61 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index c091e211aca..c17212b2481 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -55,7 +55,7 @@ public class SearchCluster implements NodeManager<Node> {
* if it only queries this cluster when the local node cannot be used, to avoid unnecessary
* cross-node network traffic.
*/
- private final Optional<Node> directDispatchTarget;
+ private final Optional<Node> localCorpusDispatchTarget;
public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) {
this.clusterId = clusterId;
@@ -82,8 +82,11 @@ public class SearchCluster implements NodeManager<Node> {
nodesByHostBuilder.put(node.hostname(), node);
this.nodesByHost = nodesByHostBuilder.build();
- this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize,
- nodesByHost, groups);
+ this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(),
+ size,
+ containerClusterSize,
+ nodesByHost,
+ groups);
this.clusterMonitor = new ClusterMonitor<>(this);
}
@@ -92,21 +95,18 @@ public class SearchCluster implements NodeManager<Node> {
this.pingFactory = pingFactory;
for (var group : orderedGroups) {
- for (var node : group.nodes()) {
- // cluster monitor will only call working() when the
- // node transitions from down to up, so we need to
- // register the initial (working) state here:
- working(node);
+ for (var node : group.nodes())
clusterMonitor.add(node, true);
- }
}
}
- private static Optional<Node> findDirectDispatchTarget(String selfHostname,
- int searchClusterSize,
- int containerClusterSize,
- ImmutableMultimap<String, Node> nodesByHost,
- ImmutableMap<Integer, Group> groups) {
+ ClusterMonitor<Node> clusterMonitor() { return clusterMonitor; }
+
+ private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
+ int searchClusterSize,
+ int containerClusterSize,
+ ImmutableMultimap<String, Node> nodesByHost,
+ ImmutableMap<Integer, Group> groups) {
// A search node in the search cluster in question is configured on the same host as the currently running container.
// It has all the data <==> No other nodes in the search cluster have the same group id as this node.
// That local search node responds.
@@ -186,70 +186,90 @@ public class SearchCluster implements NodeManager<Node> {
}
/**
- * Returns the recipient we should dispatch queries directly to (bypassing fdispatch),
+ * Returns the single, local node we should dispatch queries directly to,
* or empty if we should not dispatch directly.
*/
- public Optional<Node> directDispatchTarget() {
- if ( directDispatchTarget.isEmpty()) return Optional.empty();
+ public Optional<Node> localCorpusDispatchTarget() {
+ if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
- Group localSearchGroup = groups.get(directDispatchTarget.get().group());
+ Group localSearchGroup = groups.get(localCorpusDispatchTarget.get().group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
- // Only use direct dispatch if the local search node is up
- if ( ! directDispatchTarget.get().isWorking()) return Optional.empty();
+ // Only use direct dispatch if the local search node is not down
+ if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty();
- return directDispatchTarget;
+ return localCorpusDispatchTarget;
}
- /** Used by the cluster monitor to manage node status */
+ /** Called by the cluster monitor when node state changes to working */
@Override
public void working(Node node) {
node.setWorking(true);
-
- if (usesDirectDispatchTo(node))
- vipStatus.addToRotation(clusterId);
+ updateVipStatusOnNodeChange(node, true);
}
- /** Used by the cluster monitor to manage node status */
+ /** Called by the cluster monitor when node state changes to failed */
@Override
public void failed(Node node) {
node.setWorking(false);
-
- // Take ourselves out if we usually dispatch only to our own host
- if (usesDirectDispatchTo(node))
- vipStatus.removeFromRotation(clusterId);
+ updateVipStatusOnNodeChange(node, true);
}
private void updateSufficientCoverage(Group group, boolean sufficientCoverage) {
- // update VIP status if we direct dispatch to this group and coverage status changed
- boolean isInRotation = vipStatus.isInRotation();
- boolean hasChanged = sufficientCoverage != group.hasSufficientCoverage();
- boolean isDirectDispatchGroupAndChange = usesDirectDispatchTo(group) && hasChanged;
+ if (sufficientCoverage == group.hasSufficientCoverage()) return; // no change
+
group.setHasSufficientCoverage(sufficientCoverage);
- if ((!isInRotation || isDirectDispatchGroupAndChange) && sufficientCoverage) {
- // We will set this cluster in rotation if
- // - not already in rotation and one group has sufficient coverage.
+ updateVipStatusOnCoverageChange(group, sufficientCoverage);
+ }
+
+ private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) {
+ if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ if (hasInformationAboutAllNodes())
+ setInRotationOnlyIf(hasWorkingNodes());
+ }
+ else if (usesLocalCorpusIn(node)) { // follow the status of this node
+ setInRotationOnlyIf(nodeIsWorking);
+ }
+ }
+
+ private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) {
+ if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ // VIP status does not depend on coverage
+ }
+ else if (usesLocalCorpusIn(group)) { // follow the status of this group
+ setInRotationOnlyIf(sufficientCoverage);
+ }
+ }
+
+ private void setInRotationOnlyIf(boolean inRotation) {
+ if (inRotation)
vipStatus.addToRotation(clusterId);
- } else if (isDirectDispatchGroupAndChange) {
- // We will take it out of rotation if the group is mandatory (direct dispatch to this group)
+ else
vipStatus.removeFromRotation(clusterId);
- }
}
- private boolean usesDirectDispatchTo(Node node) {
- return directDispatchTarget.isPresent() && directDispatchTarget.get().equals(node);
+ private boolean hasInformationAboutAllNodes() {
+ return nodesByHost.values().stream().allMatch(node -> node.isWorking() != null);
+ }
+
+ private boolean hasWorkingNodes() {
+ return nodesByHost.values().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE );
+ }
+
+ private boolean usesLocalCorpusIn(Node node) {
+ return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node);
}
- private boolean usesDirectDispatchTo(Group group) {
- return directDispatchTarget.isPresent() && directDispatchTarget.get().group() == group.id();
+ private boolean usesLocalCorpusIn(Group group) {
+ return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id();
}
/** Used by the cluster monitor to manage node status */
@Override
public void ping(Node node, Executor executor) {
- if (pingFactory == null) // not initialized yet
- return;
+ if (pingFactory == null) return; // not initialized yet
+
FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor));
executor.execute(futurePong);
Pong pong = getPong(futurePong, node);
@@ -272,9 +292,10 @@ public class SearchCluster implements NodeManager<Node> {
// group will always be marked sufficient for use.
updateSufficientCoverage(group, true);
boolean fullCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), group.getActiveDocuments(),
- group.getActiveDocuments());
+ group.getActiveDocuments());
trackGroupCoverageChanges(0, group, fullCoverage, group.getActiveDocuments());
}
+
private void pingIterationCompletedMultipleGroups() {
int numGroups = orderedGroups.size();
// Update active documents per group and use it to decide if the group should be active
@@ -299,13 +320,7 @@ public class SearchCluster implements NodeManager<Node> {
trackGroupCoverageChanges(i, group, sufficientCoverage, averageDocumentsInOtherGroups);
}
}
- private boolean areAllNodesDownInAllgroups() {
- for(int i = 0; i < groups.size(); i++) {
- Group group = orderedGroups.get(i);
- if (group.workingNodes() > 0) return false;
- }
- return true;
- }
+
/**
* Update statistics after a round of issuing pings.
* Note that this doesn't wait for pings to return, so it will typically accumulate data from
@@ -319,9 +334,6 @@ public class SearchCluster implements NodeManager<Node> {
} else {
pingIterationCompletedMultipleGroups();
}
- if ( areAllNodesDownInAllgroups() ) {
- vipStatus.removeFromRotation(clusterId);
- }
}
private boolean isGroupCoverageSufficient(int workingNodes, int nodesInGroup, long activeDocuments, long averageDocumentsInOtherGroups) {
@@ -397,11 +409,11 @@ public class SearchCluster implements NodeManager<Node> {
if (changed) {
int requiredNodes = groupSize() - dispatchConfig.maxNodesDownPerGroup();
if (fullCoverage) {
- log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", index,
- group.getActiveDocuments(), averageDocuments, group.workingNodes(), groupSize()));
+ log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)",
+ index, group.getActiveDocuments(), averageDocuments, group.workingNodes(), groupSize()));
} else {
- log.warning(() -> String.format("Coverage of group %d is only %d/%d (requires %d)", index, group.workingNodes(), groupSize(),
- requiredNodes));
+ log.warning(() -> String.format("Coverage of group %d is only %d/%d (requires %d)",
+ index, group.workingNodes(), groupSize(), requiredNodes));
}
}
}