diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 134 |
1 files changed, 61 insertions, 73 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index c17212b2481..c091e211aca 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -55,7 +55,7 @@ public class SearchCluster implements NodeManager<Node> { * if it only queries this cluster when the local node cannot be used, to avoid unnecessary * cross-node network traffic. */ - private final Optional<Node> localCorpusDispatchTarget; + private final Optional<Node> directDispatchTarget; public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) { this.clusterId = clusterId; @@ -82,11 +82,8 @@ public class SearchCluster implements NodeManager<Node> { nodesByHostBuilder.put(node.hostname(), node); this.nodesByHost = nodesByHostBuilder.build(); - this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), - size, - containerClusterSize, - nodesByHost, - groups); + this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, + nodesByHost, groups); this.clusterMonitor = new ClusterMonitor<>(this); } @@ -95,18 +92,21 @@ public class SearchCluster implements NodeManager<Node> { this.pingFactory = pingFactory; for (var group : orderedGroups) { - for (var node : group.nodes()) + for (var node : group.nodes()) { + // cluster monitor will only call working() when the + // node transitions from down to up, so we need to + // register the initial (working) state here: + working(node); clusterMonitor.add(node, true); + } } } - ClusterMonitor<Node> clusterMonitor() { return clusterMonitor; } - - private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname, - int searchClusterSize, - int containerClusterSize, - ImmutableMultimap<String, Node> nodesByHost, - ImmutableMap<Integer, Group> groups) { + private static Optional<Node> findDirectDispatchTarget(String selfHostname, + int searchClusterSize, + int containerClusterSize, + ImmutableMultimap<String, Node> nodesByHost, + ImmutableMap<Integer, Group> groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. @@ -186,90 +186,70 @@ public class SearchCluster implements NodeManager<Node> { } /** - * Returns the single, local node we should dispatch queries directly to, + * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), * or empty if we should not dispatch directly. */ - public Optional<Node> localCorpusDispatchTarget() { - if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty(); + public Optional<Node> directDispatchTarget() { + if ( directDispatchTarget.isEmpty()) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = groups.get(localCorpusDispatchTarget.get().group()); + Group localSearchGroup = groups.get(directDispatchTarget.get().group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); - // Only use direct dispatch if the local search node is not down - if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty(); + // Only use direct dispatch if the local search node is up + if ( ! directDispatchTarget.get().isWorking()) return Optional.empty(); - return localCorpusDispatchTarget; + return directDispatchTarget; } - /** Called by the cluster monitor when node state changes to working */ + /** Used by the cluster monitor to manage node status */ @Override public void working(Node node) { node.setWorking(true); - updateVipStatusOnNodeChange(node, true); + + if (usesDirectDispatchTo(node)) + vipStatus.addToRotation(clusterId); } - /** Called by the cluster monitor when node state changes to failed */ + /** Used by the cluster monitor to manage node status */ @Override public void failed(Node node) { node.setWorking(false); - updateVipStatusOnNodeChange(node, true); + + // Take ourselves out if we usually dispatch only to our own host + if (usesDirectDispatchTo(node)) + vipStatus.removeFromRotation(clusterId); } private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { - if (sufficientCoverage == group.hasSufficientCoverage()) return; // no change - + // update VIP status if we direct dispatch to this group and coverage status changed + boolean isInRotation = vipStatus.isInRotation(); + boolean hasChanged = sufficientCoverage != group.hasSufficientCoverage(); + boolean isDirectDispatchGroupAndChange = usesDirectDispatchTo(group) && hasChanged; group.setHasSufficientCoverage(sufficientCoverage); - updateVipStatusOnCoverageChange(group, sufficientCoverage); - } - - private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) { - if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster - if (hasInformationAboutAllNodes()) - setInRotationOnlyIf(hasWorkingNodes()); - } - else if (usesLocalCorpusIn(node)) { // follow the status of this node - setInRotationOnlyIf(nodeIsWorking); - } - } - - private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) { - if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster - // VIP status does not depend on coverage - } - else if (usesLocalCorpusIn(group)) { // follow the status of this group - setInRotationOnlyIf(sufficientCoverage); - } - } - - private void setInRotationOnlyIf(boolean inRotation) { - if (inRotation) + if ((!isInRotation || isDirectDispatchGroupAndChange) && sufficientCoverage) { + // We will set this cluster in rotation if + // - not already in rotation and one group has sufficient coverage. vipStatus.addToRotation(clusterId); - else + } else if (isDirectDispatchGroupAndChange) { + // We will take it out of rotation if the group is mandatory (direct dispatch to this group) vipStatus.removeFromRotation(clusterId); + } } - private boolean hasInformationAboutAllNodes() { - return nodesByHost.values().stream().allMatch(node -> node.isWorking() != null); - } - - private boolean hasWorkingNodes() { - return nodesByHost.values().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); - } - - private boolean usesLocalCorpusIn(Node node) { - return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node); + private boolean usesDirectDispatchTo(Node node) { + return directDispatchTarget.isPresent() && directDispatchTarget.get().equals(node); } - private boolean usesLocalCorpusIn(Group group) { - return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); + private boolean usesDirectDispatchTo(Group group) { + return directDispatchTarget.isPresent() && directDispatchTarget.get().group() == group.id(); } /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { - if (pingFactory == null) return; // not initialized yet - + if (pingFactory == null) // not initialized yet + return; FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor)); executor.execute(futurePong); Pong pong = getPong(futurePong, node); @@ -292,10 +272,9 @@ public class SearchCluster implements NodeManager<Node> { // group will always be marked sufficient for use. updateSufficientCoverage(group, true); boolean fullCoverage = isGroupCoverageSufficient(group.workingNodes(), group.nodes().size(), group.getActiveDocuments(), - group.getActiveDocuments()); + group.getActiveDocuments()); trackGroupCoverageChanges(0, group, fullCoverage, group.getActiveDocuments()); } - private void pingIterationCompletedMultipleGroups() { int numGroups = orderedGroups.size(); // Update active documents per group and use it to decide if the group should be active @@ -320,7 +299,13 @@ public class SearchCluster implements NodeManager<Node> { trackGroupCoverageChanges(i, group, sufficientCoverage, averageDocumentsInOtherGroups); } } - + private boolean areAllNodesDownInAllgroups() { + for(int i = 0; i < groups.size(); i++) { + Group group = orderedGroups.get(i); + if (group.workingNodes() > 0) return false; + } + return true; + } /** * Update statistics after a round of issuing pings. * Note that this doesn't wait for pings to return, so it will typically accumulate data from @@ -334,6 +319,9 @@ public class SearchCluster implements NodeManager<Node> { } else { pingIterationCompletedMultipleGroups(); } + if ( areAllNodesDownInAllgroups() ) { + vipStatus.removeFromRotation(clusterId); + } } private boolean isGroupCoverageSufficient(int workingNodes, int nodesInGroup, long activeDocuments, long averageDocumentsInOtherGroups) { @@ -409,11 +397,11 @@ public class SearchCluster implements NodeManager<Node> { if (changed) { int requiredNodes = groupSize() - dispatchConfig.maxNodesDownPerGroup(); if (fullCoverage) { - log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", - index, group.getActiveDocuments(), averageDocuments, group.workingNodes(), groupSize())); + log.info(() -> String.format("Group %d is now good again (%d/%d active docs, coverage %d/%d)", index, + group.getActiveDocuments(), averageDocuments, group.workingNodes(), groupSize())); } else { - log.warning(() -> String.format("Coverage of group %d is only %d/%d (requires %d)", - index, group.workingNodes(), groupSize(), requiredNodes)); + log.warning(() -> String.format("Coverage of group %d is only %d/%d (requires %d)", index, group.workingNodes(), groupSize(), + requiredNodes)); } } } |