aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-25 19:19:45 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-25 19:19:45 +0100
commit6b2223c7e5897929a0570434d66ff08f554b6317 (patch)
tree00dcf18399fc40a69c057212bf49bf2e71d780e7 /container-search
parentb132f003f1bb2447ebc8422c6fbc318fb8c92d17 (diff)
Let RpcResourcePool handle live changes.
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java41
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java3
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java18
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java14
5 files changed, 69 insertions, 16 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 762438aa489..b9c2b40134a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -36,7 +36,7 @@ public class RpcClient implements Client {
return new RpcNodeConnection(hostname, port, supervisor);
}
- private static class RpcNodeConnection implements NodeConnection {
+ static class RpcNodeConnection implements NodeConnection {
// Information about the connected node
private final Supervisor supervisor;
@@ -56,6 +56,13 @@ public class RpcClient implements Client {
target = supervisor.connect(new Spec(hostname, port));
}
+ public String getHostname() {
+ return hostname;
+ }
+ public int getPort() {
+ return port;
+ }
+
@Override
public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
ResponseReceiver responseReceiver, double timeoutSeconds) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index db95921c47b..afaa9afab33 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -1,14 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
-import com.google.common.collect.ImmutableMap;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -22,31 +21,45 @@ import java.util.concurrent.ThreadLocalRandom;
public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
- private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
+ private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of();
+ private final int numConnections;
private final RpcClient rpcClient;
RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) {
- var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
- nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
- this.nodeConnectionPools = builder.build();
+ var builder = new HashMap<Integer, NodeConnectionPool>();
+ nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(List.of(connection))));
+ this.nodeConnectionPools = Map.copyOf(builder);
this.rpcClient = null;
+ this.numConnections = 1;
}
public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
super();
rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
+ numConnections = dispatchConfig.numJrtConnectionsPerNode();
+ updateNodes(nodesConfig);
+ }
- // Create rpc node connection pools indexed by the node distribution key
- int numConnections = dispatchConfig.numJrtConnectionsPerNode();
- var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
+ public void updateNodes(DispatchNodesConfig nodesConfig) {
+ var builder = new HashMap<Integer, NodeConnectionPool>();
for (var node : nodesConfig.node()) {
- var connections = new ArrayList<NodeConnection>(numConnections);
- for (int i = 0; i < numConnections; i++) {
- connections.add(rpcClient.createConnection(node.host(), node.port()));
+ var prev = nodeConnectionPools.get(node.key());
+ NodeConnection nc = prev != null ? prev.nextConnection() : null;
+ if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection
+ && rpcNodeConnection.getPort() == node.port()
+ && rpcNodeConnection.getHostname().equals(node.host()))
+ {
+ builder.put(node.key(), prev);
+ } else {
+ if (prev != null) prev.release();
+ var connections = new ArrayList<NodeConnection>(numConnections);
+ for (int i = 0; i < numConnections; i++) {
+ connections.add(rpcClient.createConnection(node.host(), node.port()));
+ }
+ builder.put(node.key(), new NodeConnectionPool(connections));
}
- builder.put(node.key(), new NodeConnectionPool(connections));
}
- this.nodeConnectionPools = builder.build();
+ this.nodeConnectionPools = Map.copyOf(builder);
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 2783b45f724..e755abc7d66 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -41,7 +41,7 @@ public class SearchCluster implements NodeManager<Node> {
*/
private final Node localCorpusDispatchTarget;
- public SearchCluster(String clusterId, double minActivedocsPercentage, List<Node> nodes,
+ public SearchCluster(String clusterId, double minActivedocsPercentage, Collection<Node> nodes,
VipStatus vipStatus, PingFactory pingFactory) {
this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory);
}
@@ -55,6 +55,7 @@ public class SearchCluster implements NodeManager<Node> {
@Override
public String name() { return clusterId; }
+ public VipStatus getVipStatus() { return vipStatus; }
public void addMonitoring(ClusterMonitor<Node> clusterMonitor) {
for (var group : groups()) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
index 82b7845d63d..ff37352b39c 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
@@ -16,6 +16,8 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import static com.yahoo.search.dispatch.searchcluster.MockSearchCluster.createDispatchConfig;
+import static com.yahoo.search.dispatch.searchcluster.MockSearchCluster.createNodesConfig;
import static org.junit.jupiter.api.Assertions.*;
/**
@@ -71,6 +73,22 @@ public class RpcSearchInvokerTest {
assertEquals(maxHits, request.getHits());
}
+ void verifyConnections(RpcResourcePool rpcResourcePool, int numGroups, int nodesPerGroup) {
+ rpcResourcePool.updateNodes(createNodesConfig(numGroups,nodesPerGroup));
+ for (int nodeId = 0; nodeId < numGroups*nodesPerGroup; nodeId++) {
+ assertTrue(rpcResourcePool.getConnection(nodeId) instanceof RpcClient.RpcNodeConnection);
+ }
+ assertNull(rpcResourcePool.getConnection(numGroups*nodesPerGroup));
+ }
+
+ @Test
+ void testUpdateOfRpcResourcePool() {
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(createDispatchConfig(), createNodesConfig(0, 0));
+ verifyConnections(rpcResourcePool, 3,3);
+ verifyConnections(rpcResourcePool, 4,4);
+ verifyConnections(rpcResourcePool, 2,2);
+ }
+
private Client parameterCollectorClient(AtomicReference<CompressionType> compressionTypeHolder, AtomicReference<byte[]> payloadHolder,
AtomicInteger lengthHolder) {
return new Client() {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java
index cbf6273d3ae..5fb5b465c69 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java
@@ -2,6 +2,7 @@
package com.yahoo.search.dispatch.searchcluster;
import com.yahoo.vespa.config.search.DispatchConfig;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
import java.util.ArrayList;
import java.util.HashMap;
@@ -52,6 +53,19 @@ public class MockSearchCluster extends SearchCluster {
return builder;
}
+ public static DispatchNodesConfig createNodesConfig(int numGroups, int nodesPerGroup) {
+ var builder = new DispatchNodesConfig.Builder();
+ int key = 0;
+ for (int g = 0; g < numGroups; g++) {
+ for (int i = 0; i < nodesPerGroup; i++) {
+ var nodeBuilder = new DispatchNodesConfig.Node.Builder();
+ nodeBuilder.key(key++).port(0).group(g).host("host" + g + "." + i);
+ builder.node.add(nodeBuilder);
+ }
+ }
+ return builder.build();
+ }
+
public static SearchGroupsImpl buildGroupListForTest(int numGroups, int nodesPerGroup, double minActivedocsPercentage) {
return new SearchGroupsImpl(buildGroupMapForTest(numGroups, nodesPerGroup), minActivedocsPercentage);
}