summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java10
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java10
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java23
3 files changed, 26 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 6f6b0fc2b79..eca0c8058a1 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -121,7 +121,7 @@ public class Dispatcher extends AbstractComponent {
DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) {
this(dispatchConfig, rpcConnectionPool,
new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
- toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
+ toNodes(clusterId.stringValue(), nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
invokerFactories);
}
@@ -180,7 +180,7 @@ public class Dispatcher extends AbstractComponent {
* under the assumption that this is the common case, i.e., new nodes have no documents yet.
*/
void updateWithNewConfig(DispatchNodesConfig nodesConfig) {
- try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up.
+ try (var items = volatileItems()) { // Mark a reference to the old snapshot, which we want to have cleaned up.
items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0.
// Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do.
@@ -192,7 +192,7 @@ public class Dispatcher extends AbstractComponent {
};
// Update the nodes the search cluster keeps track of, and what nodes are monitored.
- ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
+ ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), dispatchConfig.minActivedocsPercentage());
// Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
this.volatileItems = update(newMonitor);
@@ -234,9 +234,9 @@ public class Dispatcher extends AbstractComponent {
case LATENCY_AMORTIZED_OVER_TIME -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
};
}
- private static List<Node> toNodes(DispatchNodesConfig nodesConfig) {
+ private static List<Node> toNodes(String clusterName, DispatchNodesConfig nodesConfig) {
return nodesConfig.node().stream()
- .map(n -> new Node(n.key(), n.host(), n.group()))
+ .map(n -> new Node(clusterName, n.key(), n.host(), n.group()))
.toList();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
index aeb04bfb141..31e02f910ee 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
@@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class Node {
+ private final String clusterName;
private final int key;
private final String hostname;
private final int group;
@@ -25,7 +26,8 @@ public class Node {
private volatile boolean working = true;
private volatile boolean isBlockingWrites = false;
- public Node(int key, String hostname, int group) {
+ public Node(String clusterName, int key, String hostname, int group) {
+ this.clusterName = clusterName;
this.key = key;
this.hostname = hostname;
this.group = group;
@@ -33,7 +35,7 @@ public class Node {
/** Give a monotonically increasing sequence number.*/
public long createPingSequenceId() { return pingSequence.incrementAndGet(); }
- /** Checks if this pong is received in line and accepted, or out of band and should be ignored..*/
+ /** Checks if this pong is received in line and accepted, or out of band and should be ignored. */
public boolean isLastReceivedPong(long pingId ) {
long last = lastPong.get();
while ((pingId > last) && ! lastPong.compareAndSet(last, pingId)) {
@@ -103,8 +105,8 @@ public class Node {
@Override
public String toString() {
- return "search node key = " + key + " hostname = "+ hostname + " path = " + pathIndex + " in group " + group +
- " statusIsKnown = " + statusIsKnown + " working = " + working +
+ return "search node in cluster = " + clusterName + " key = " + key + " hostname = "+ hostname +
+ " path = " + pathIndex + " in group " + group + " statusIsKnown = " + statusIsKnown + " working = " + working +
" activeDocs = " + getActiveDocuments() + " targetActiveDocs = " + getTargetActiveDocuments();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 3c8950f1f7f..59b4637a627 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -8,9 +8,9 @@ import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
import com.yahoo.yolean.UncheckedInterruptedException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -18,6 +18,7 @@ import java.util.concurrent.Executor;
import java.util.logging.Logger;
import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toMap;
/**
* A model of a search cluster we might want to dispatch queries to.
@@ -42,7 +43,7 @@ public class SearchCluster implements NodeManager<Node> {
* if it only queries this cluster when the local node cannot be used, to avoid unnecessary
* cross-node network traffic.
*/
- private final Node localCorpusDispatchTarget;
+ private volatile Node localCorpusDispatchTarget;
public SearchCluster(String clusterId, double minActivedocsPercentage, Collection<Node> nodes,
VipStatus vipStatus, PingFactory pingFactory) {
@@ -62,12 +63,14 @@ public class SearchCluster implements NodeManager<Node> {
/** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */
public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) {
- Collection<Node> retainedNodes = groups.nodes();
- Collection<Node> currentNodes = new HashSet<>(newNodes);
- retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set.
- currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object.
- Collection<Node> addedNodes = List.copyOf(currentNodes);
- currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set.
+ List<Node> currentNodes = new ArrayList<>(newNodes);
+ List<Node> addedNodes = new ArrayList<>();
+ Map<Node, Node> retainedNodes = groups.nodes().stream().collect(toMap(node -> node, node -> node));
+ for (int i = 0; i < currentNodes.size(); i++) {
+ Node retained = retainedNodes.get(currentNodes.get(i));
+ if (retained != null) currentNodes.set(i, retained);
+ else addedNodes.add(currentNodes.get(i));
+ }
SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage);
ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false);
for (Node node : groups.nodes()) monitor.add(node, true);
@@ -75,6 +78,7 @@ public class SearchCluster implements NodeManager<Node> {
try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } }
catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
pingIterationCompleted(groups);
+ this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups);
this.groups = groups;
return monitor;
}
@@ -139,6 +143,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void updateWorkingState(Node node, boolean isWorking) {
+ log.fine(() -> "Updating working state of " + node + " to " + isWorking);
node.setWorking(isWorking);
updateVipStatusOnNodeChange(node, isWorking);
}
@@ -214,6 +219,7 @@ public class SearchCluster implements NodeManager<Node> {
/** Used by the cluster monitor to manage node status */
@Override
public void ping(ClusterMonitor<Node> clusterMonitor, Node node, Executor executor) {
+ log.fine(() -> "Pinging " + node);
Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor));
pinger.ping();
}
@@ -300,6 +306,7 @@ public class SearchCluster implements NodeManager<Node> {
@Override
public void handle(Pong pong) {
+ log.fine(() -> "Got pong from " + node + ": " + pong);
if (pong.badResponse()) {
clusterMonitor.failed(node, pong.error().get());
} else {