summaryrefslogtreecommitdiffstats
path: root/container-search/src/main
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-25 19:19:45 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-25 19:19:45 +0100
commit6b2223c7e5897929a0570434d66ff08f554b6317 (patch)
tree00dcf18399fc40a69c057212bf49bf2e71d780e7 /container-search/src/main
parentb132f003f1bb2447ebc8422c6fbc318fb8c92d17 (diff)
Let RpcResourcePool handle live changes.
Diffstat (limited to 'container-search/src/main')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java41
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java3
3 files changed, 37 insertions, 16 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 762438aa489..b9c2b40134a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -36,7 +36,7 @@ public class RpcClient implements Client {
return new RpcNodeConnection(hostname, port, supervisor);
}
- private static class RpcNodeConnection implements NodeConnection {
+ static class RpcNodeConnection implements NodeConnection {
// Information about the connected node
private final Supervisor supervisor;
@@ -56,6 +56,13 @@ public class RpcClient implements Client {
target = supervisor.connect(new Spec(hostname, port));
}
+ public String getHostname() {
+ return hostname;
+ }
+ public int getPort() {
+ return port;
+ }
+
@Override
public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
ResponseReceiver responseReceiver, double timeoutSeconds) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index db95921c47b..afaa9afab33 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -1,14 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
-import com.google.common.collect.ImmutableMap;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -22,31 +21,45 @@ import java.util.concurrent.ThreadLocalRandom;
public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
- private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
+ private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of();
+ private final int numConnections;
private final RpcClient rpcClient;
RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) {
- var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
- nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
- this.nodeConnectionPools = builder.build();
+ var builder = new HashMap<Integer, NodeConnectionPool>();
+ nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(List.of(connection))));
+ this.nodeConnectionPools = Map.copyOf(builder);
this.rpcClient = null;
+ this.numConnections = 1;
}
public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
super();
rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
+ numConnections = dispatchConfig.numJrtConnectionsPerNode();
+ updateNodes(nodesConfig);
+ }
- // Create rpc node connection pools indexed by the node distribution key
- int numConnections = dispatchConfig.numJrtConnectionsPerNode();
- var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
+ public void updateNodes(DispatchNodesConfig nodesConfig) {
+ var builder = new HashMap<Integer, NodeConnectionPool>();
for (var node : nodesConfig.node()) {
- var connections = new ArrayList<NodeConnection>(numConnections);
- for (int i = 0; i < numConnections; i++) {
- connections.add(rpcClient.createConnection(node.host(), node.port()));
+ var prev = nodeConnectionPools.get(node.key());
+ NodeConnection nc = prev != null ? prev.nextConnection() : null;
+ if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection
+ && rpcNodeConnection.getPort() == node.port()
+ && rpcNodeConnection.getHostname().equals(node.host()))
+ {
+ builder.put(node.key(), prev);
+ } else {
+ if (prev != null) prev.release();
+ var connections = new ArrayList<NodeConnection>(numConnections);
+ for (int i = 0; i < numConnections; i++) {
+ connections.add(rpcClient.createConnection(node.host(), node.port()));
+ }
+ builder.put(node.key(), new NodeConnectionPool(connections));
}
- builder.put(node.key(), new NodeConnectionPool(connections));
}
- this.nodeConnectionPools = builder.build();
+ this.nodeConnectionPools = Map.copyOf(builder);
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 2783b45f724..e755abc7d66 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -41,7 +41,7 @@ public class SearchCluster implements NodeManager<Node> {
*/
private final Node localCorpusDispatchTarget;
- public SearchCluster(String clusterId, double minActivedocsPercentage, List<Node> nodes,
+ public SearchCluster(String clusterId, double minActivedocsPercentage, Collection<Node> nodes,
VipStatus vipStatus, PingFactory pingFactory) {
this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory);
}
@@ -55,6 +55,7 @@ public class SearchCluster implements NodeManager<Node> {
@Override
public String name() { return clusterId; }
+ public VipStatus getVipStatus() { return vipStatus; }
public void addMonitoring(ClusterMonitor<Node> clusterMonitor) {
for (var group : groups()) {