diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-25 19:19:45 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-25 19:19:45 +0100 |
commit | 6b2223c7e5897929a0570434d66ff08f554b6317 (patch) | |
tree | 00dcf18399fc40a69c057212bf49bf2e71d780e7 /container-search | |
parent | b132f003f1bb2447ebc8422c6fbc318fb8c92d17 (diff) |
Let RpcResourcePool handle live changes.
Diffstat (limited to 'container-search')
5 files changed, 69 insertions, 16 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 762438aa489..b9c2b40134a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -36,7 +36,7 @@ public class RpcClient implements Client { return new RpcNodeConnection(hostname, port, supervisor); } - private static class RpcNodeConnection implements NodeConnection { + static class RpcNodeConnection implements NodeConnection { // Information about the connected node private final Supervisor supervisor; @@ -56,6 +56,13 @@ public class RpcClient implements Client { target = supervisor.connect(new Spec(hostname, port)); } + public String getHostname() { + return hostname; + } + public int getPort() { + return port; + } + @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index db95921c47b..afaa9afab33 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -1,14 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; -import com.google.common.collect.ImmutableMap; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -22,31 +21,45 @@ import java.util.concurrent.ThreadLocalRandom; public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ - private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools; + private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of(); + private final int numConnections; private final RpcClient rpcClient; RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) { - var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); - nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); - this.nodeConnectionPools = builder.build(); + var builder = new HashMap<Integer, NodeConnectionPool>(); + nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(List.of(connection)))); + this.nodeConnectionPools = Map.copyOf(builder); this.rpcClient = null; + this.numConnections = 1; } public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { super(); rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); + numConnections = dispatchConfig.numJrtConnectionsPerNode(); + updateNodes(nodesConfig); + } - // Create rpc node connection pools indexed by the node distribution key - int numConnections = dispatchConfig.numJrtConnectionsPerNode(); - var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); + public void updateNodes(DispatchNodesConfig nodesConfig) { + var builder = new HashMap<Integer, NodeConnectionPool>(); for (var node : nodesConfig.node()) { - var connections = new ArrayList<NodeConnection>(numConnections); - for (int i = 0; i < numConnections; i++) { - connections.add(rpcClient.createConnection(node.host(), node.port())); + var prev = nodeConnectionPools.get(node.key()); + NodeConnection nc = prev != null ? prev.nextConnection() : null; + if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection + && rpcNodeConnection.getPort() == node.port() + && rpcNodeConnection.getHostname().equals(node.host())) + { + builder.put(node.key(), prev); + } else { + if (prev != null) prev.release(); + var connections = new ArrayList<NodeConnection>(numConnections); + for (int i = 0; i < numConnections; i++) { + connections.add(rpcClient.createConnection(node.host(), node.port())); + } + builder.put(node.key(), new NodeConnectionPool(connections)); } - builder.put(node.key(), new NodeConnectionPool(connections)); } - this.nodeConnectionPools = builder.build(); + this.nodeConnectionPools = Map.copyOf(builder); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 2783b45f724..e755abc7d66 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -41,7 +41,7 @@ public class SearchCluster implements NodeManager<Node> { */ private final Node localCorpusDispatchTarget; - public SearchCluster(String clusterId, double minActivedocsPercentage, List<Node> nodes, + public SearchCluster(String clusterId, double minActivedocsPercentage, Collection<Node> nodes, VipStatus vipStatus, PingFactory pingFactory) { this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory); } @@ -55,6 +55,7 @@ public class SearchCluster implements NodeManager<Node> { @Override public String name() { return clusterId; } + public VipStatus getVipStatus() { return vipStatus; } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { for (var group : groups()) { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index 82b7845d63d..ff37352b39c 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -16,6 +16,8 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.yahoo.search.dispatch.searchcluster.MockSearchCluster.createDispatchConfig; +import static com.yahoo.search.dispatch.searchcluster.MockSearchCluster.createNodesConfig; import static org.junit.jupiter.api.Assertions.*; /** @@ -71,6 +73,22 @@ public class RpcSearchInvokerTest { assertEquals(maxHits, request.getHits()); } + void verifyConnections(RpcResourcePool rpcResourcePool, int numGroups, int nodesPerGroup) { + rpcResourcePool.updateNodes(createNodesConfig(numGroups,nodesPerGroup)); + for (int nodeId = 0; nodeId < numGroups*nodesPerGroup; nodeId++) { + assertTrue(rpcResourcePool.getConnection(nodeId) instanceof RpcClient.RpcNodeConnection); + } + assertNull(rpcResourcePool.getConnection(numGroups*nodesPerGroup)); + } + + @Test + void testUpdateOfRpcResourcePool() { + RpcResourcePool rpcResourcePool = new RpcResourcePool(createDispatchConfig(), createNodesConfig(0, 0)); + verifyConnections(rpcResourcePool, 3,3); + verifyConnections(rpcResourcePool, 4,4); + verifyConnections(rpcResourcePool, 2,2); + } + private Client parameterCollectorClient(AtomicReference<CompressionType> compressionTypeHolder, AtomicReference<byte[]> payloadHolder, AtomicInteger lengthHolder) { return new Client() { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java index cbf6273d3ae..5fb5b465c69 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java @@ -2,6 +2,7 @@ package com.yahoo.search.dispatch.searchcluster; import com.yahoo.vespa.config.search.DispatchConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.util.ArrayList; import java.util.HashMap; @@ -52,6 +53,19 @@ public class MockSearchCluster extends SearchCluster { return builder; } + public static DispatchNodesConfig createNodesConfig(int numGroups, int nodesPerGroup) { + var builder = new DispatchNodesConfig.Builder(); + int key = 0; + for (int g = 0; g < numGroups; g++) { + for (int i = 0; i < nodesPerGroup; i++) { + var nodeBuilder = new DispatchNodesConfig.Node.Builder(); + nodeBuilder.key(key++).port(0).group(g).host("host" + g + "." + i); + builder.node.add(nodeBuilder); + } + } + return builder.build(); + } + public static SearchGroupsImpl buildGroupListForTest(int numGroups, int nodesPerGroup, double minActivedocsPercentage) { return new SearchGroupsImpl(buildGroupMapForTest(numGroups, nodesPerGroup), minActivedocsPercentage); } |