diff options
Diffstat (limited to 'container-search/src/main/java')
3 files changed, 26 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 6f6b0fc2b79..eca0c8058a1 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -121,7 +121,7 @@ public class Dispatcher extends AbstractComponent { DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), - toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), + toNodes(clusterId.stringValue(), nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), invokerFactories); } @@ -180,7 +180,7 @@ public class Dispatcher extends AbstractComponent { * under the assumption that this is the common case, i.e., new nodes have no documents yet. */ void updateWithNewConfig(DispatchNodesConfig nodesConfig) { - try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up. + try (var items = volatileItems()) { // Mark a reference to the old snapshot, which we want to have cleaned up. items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0. // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do. @@ -192,7 +192,7 @@ public class Dispatcher extends AbstractComponent { }; // Update the nodes the search cluster keeps track of, and what nodes are monitored. - ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); + ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. this.volatileItems = update(newMonitor); @@ -234,9 +234,9 @@ public class Dispatcher extends AbstractComponent { case LATENCY_AMORTIZED_OVER_TIME -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME; }; } - private static List<Node> toNodes(DispatchNodesConfig nodesConfig) { + private static List<Node> toNodes(String clusterName, DispatchNodesConfig nodesConfig) { return nodesConfig.node().stream() - .map(n -> new Node(n.key(), n.host(), n.group())) + .map(n -> new Node(clusterName, n.key(), n.host(), n.group())) .toList(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index aeb04bfb141..31e02f910ee 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class Node { + private final String clusterName; private final int key; private final String hostname; private final int group; @@ -25,7 +26,8 @@ public class Node { private volatile boolean working = true; private volatile boolean isBlockingWrites = false; - public Node(int key, String hostname, int group) { + public Node(String clusterName, int key, String hostname, int group) { + this.clusterName = clusterName; this.key = key; this.hostname = hostname; this.group = group; @@ -33,7 +35,7 @@ public class Node { /** Give a monotonically increasing sequence number.*/ public long createPingSequenceId() { return pingSequence.incrementAndGet(); } - /** Checks if this pong is received in line and accepted, or out of band and should be ignored..*/ + /** Checks if this pong is received in line and accepted, or out of band and should be ignored. */ public boolean isLastReceivedPong(long pingId ) { long last = lastPong.get(); while ((pingId > last) && ! lastPong.compareAndSet(last, pingId)) { @@ -103,8 +105,8 @@ public class Node { @Override public String toString() { - return "search node key = " + key + " hostname = "+ hostname + " path = " + pathIndex + " in group " + group + - " statusIsKnown = " + statusIsKnown + " working = " + working + + return "search node in cluster = " + clusterName + " key = " + key + " hostname = "+ hostname + + " path = " + pathIndex + " in group " + group + " statusIsKnown = " + statusIsKnown + " working = " + working + " activeDocs = " + getActiveDocuments() + " targetActiveDocs = " + getTargetActiveDocuments(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 3c8950f1f7f..59b4637a627 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -8,9 +8,9 @@ import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; import com.yahoo.yolean.UncheckedInterruptedException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -18,6 +18,7 @@ import java.util.concurrent.Executor; import java.util.logging.Logger; import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toMap; /** * A model of a search cluster we might want to dispatch queries to. @@ -42,7 +43,7 @@ public class SearchCluster implements NodeManager<Node> { * if it only queries this cluster when the local node cannot be used, to avoid unnecessary * cross-node network traffic. */ - private final Node localCorpusDispatchTarget; + private volatile Node localCorpusDispatchTarget; public SearchCluster(String clusterId, double minActivedocsPercentage, Collection<Node> nodes, VipStatus vipStatus, PingFactory pingFactory) { @@ -62,12 +63,14 @@ public class SearchCluster implements NodeManager<Node> { /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */ public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { - Collection<Node> retainedNodes = groups.nodes(); - Collection<Node> currentNodes = new HashSet<>(newNodes); - retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set. - currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object. - Collection<Node> addedNodes = List.copyOf(currentNodes); - currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set. + List<Node> currentNodes = new ArrayList<>(newNodes); + List<Node> addedNodes = new ArrayList<>(); + Map<Node, Node> retainedNodes = groups.nodes().stream().collect(toMap(node -> node, node -> node)); + for (int i = 0; i < currentNodes.size(); i++) { + Node retained = retainedNodes.get(currentNodes.get(i)); + if (retained != null) currentNodes.set(i, retained); + else addedNodes.add(currentNodes.get(i)); + } SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage); ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false); for (Node node : groups.nodes()) monitor.add(node, true); @@ -75,6 +78,7 @@ public class SearchCluster implements NodeManager<Node> { try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } } catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } pingIterationCompleted(groups); + this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups); this.groups = groups; return monitor; } @@ -139,6 +143,7 @@ public class SearchCluster implements NodeManager<Node> { } private void updateWorkingState(Node node, boolean isWorking) { + log.fine(() -> "Updating working state of " + node + " to " + isWorking); node.setWorking(isWorking); updateVipStatusOnNodeChange(node, isWorking); } @@ -214,6 +219,7 @@ public class SearchCluster implements NodeManager<Node> { /** Used by the cluster monitor to manage node status */ @Override public void ping(ClusterMonitor<Node> clusterMonitor, Node node, Executor executor) { + log.fine(() -> "Pinging " + node); Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor)); pinger.ping(); } @@ -300,6 +306,7 @@ public class SearchCluster implements NodeManager<Node> { @Override public void handle(Pong pong) { + log.fine(() -> "Got pong from " + node + ": " + pong); if (pong.badResponse()) { clusterMonitor.failed(node, pong.error().get()); } else { |