diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 34 |
1 files changed, 17 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 3c8950f1f7f..f7a77ebf963 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -6,11 +6,10 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.yolean.UncheckedInterruptedException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -18,6 +17,7 @@ import java.util.concurrent.Executor; import java.util.logging.Logger; import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toMap; /** * A model of a search cluster we might want to dispatch queries to. @@ -42,7 +42,7 @@ public class SearchCluster implements NodeManager<Node> { * if it only queries this cluster when the local node cannot be used, to avoid unnecessary * cross-node network traffic. */ - private final Node localCorpusDispatchTarget; + private volatile Node localCorpusDispatchTarget; public SearchCluster(String clusterId, double minActivedocsPercentage, Collection<Node> nodes, VipStatus vipStatus, PingFactory pingFactory) { @@ -61,22 +61,19 @@ public class SearchCluster implements NodeManager<Node> { public String name() { return clusterId; } /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */ - public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { - Collection<Node> retainedNodes = groups.nodes(); - Collection<Node> currentNodes = new HashSet<>(newNodes); - retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set. - currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object. - Collection<Node> addedNodes = List.copyOf(currentNodes); - currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set. + public void updateNodes(Collection<Node> newNodes, ClusterMonitor<Node> monitor, double minActivedocsPercentage) { + List<Node> currentNodes = new ArrayList<>(newNodes); + List<Node> addedNodes = new ArrayList<>(); + Map<Node, Node> retainedNodes = groups.nodes().stream().collect(toMap(node -> node, node -> node)); + for (int i = 0; i < currentNodes.size(); i++) { + Node retained = retainedNodes.get(currentNodes.get(i)); + if (retained != null) currentNodes.set(i, retained); + else addedNodes.add(currentNodes.get(i)); + } SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage); - ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false); - for (Node node : groups.nodes()) monitor.add(node, true); - monitor.start(); - try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } } - catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } - pingIterationCompleted(groups); + this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups); + monitor.reconfigure(groups.nodes()); this.groups = groups; - return monitor; } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { @@ -139,6 +136,7 @@ public class SearchCluster implements NodeManager<Node> { } private void updateWorkingState(Node node, boolean isWorking) { + log.fine(() -> "Updating working state of " + node + " to " + isWorking); node.setWorking(isWorking); updateVipStatusOnNodeChange(node, isWorking); } @@ -214,6 +212,7 @@ public class SearchCluster implements NodeManager<Node> { /** Used by the cluster monitor to manage node status */ @Override public void ping(ClusterMonitor<Node> clusterMonitor, Node node, Executor executor) { + log.fine(() -> "Pinging " + node); Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor)); pinger.ping(); } @@ -300,6 +299,7 @@ public class SearchCluster implements NodeManager<Node> { @Override public void handle(Pong pong) { + log.fine(() -> "Got pong from " + node + ": " + pong); if (pong.badResponse()) { clusterMonitor.failed(node, pong.error().get()); } else { |