From 6b2223c7e5897929a0570434d66ff08f554b6317 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 25 Nov 2022 19:19:45 +0100 Subject: Let RpcResourcePool handle live changes. --- .../com/yahoo/search/dispatch/rpc/RpcClient.java | 9 ++++- .../yahoo/search/dispatch/rpc/RpcResourcePool.java | 41 ++++++++++++++-------- .../dispatch/searchcluster/SearchCluster.java | 3 +- 3 files changed, 37 insertions(+), 16 deletions(-) (limited to 'container-search/src/main/java/com/yahoo/search') diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 762438aa489..b9c2b40134a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -36,7 +36,7 @@ public class RpcClient implements Client { return new RpcNodeConnection(hostname, port, supervisor); } - private static class RpcNodeConnection implements NodeConnection { + static class RpcNodeConnection implements NodeConnection { // Information about the connected node private final Supervisor supervisor; @@ -56,6 +56,13 @@ public class RpcClient implements Client { target = supervisor.connect(new Spec(hostname, port)); } + public String getHostname() { + return hostname; + } + public int getPort() { + return port; + } + @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index db95921c47b..afaa9afab33 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -1,14 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; -import com.google.common.collect.ImmutableMap; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -22,31 +21,45 @@ import java.util.concurrent.ThreadLocalRandom; public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ - private final ImmutableMap nodeConnectionPools; + private volatile Map nodeConnectionPools = Map.of(); + private final int numConnections; private final RpcClient rpcClient; RpcResourcePool(Map nodeConnections) { - var builder = new ImmutableMap.Builder(); - nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); - this.nodeConnectionPools = builder.build(); + var builder = new HashMap(); + nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(List.of(connection)))); + this.nodeConnectionPools = Map.copyOf(builder); this.rpcClient = null; + this.numConnections = 1; } public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { super(); rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); + numConnections = dispatchConfig.numJrtConnectionsPerNode(); + updateNodes(nodesConfig); + } - // Create rpc node connection pools indexed by the node distribution key - int numConnections = dispatchConfig.numJrtConnectionsPerNode(); - var builder = new ImmutableMap.Builder(); + public void updateNodes(DispatchNodesConfig nodesConfig) { + var builder = new HashMap(); for (var node : nodesConfig.node()) { - var connections = new ArrayList(numConnections); - for (int i = 0; i < numConnections; i++) { - connections.add(rpcClient.createConnection(node.host(), node.port())); + var prev = nodeConnectionPools.get(node.key()); + NodeConnection nc = prev != null ? prev.nextConnection() : null; + if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection + && rpcNodeConnection.getPort() == node.port() + && rpcNodeConnection.getHostname().equals(node.host())) + { + builder.put(node.key(), prev); + } else { + if (prev != null) prev.release(); + var connections = new ArrayList(numConnections); + for (int i = 0; i < numConnections; i++) { + connections.add(rpcClient.createConnection(node.host(), node.port())); + } + builder.put(node.key(), new NodeConnectionPool(connections)); } - builder.put(node.key(), new NodeConnectionPool(connections)); } - this.nodeConnectionPools = builder.build(); + this.nodeConnectionPools = Map.copyOf(builder); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 2783b45f724..e755abc7d66 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -41,7 +41,7 @@ public class SearchCluster implements NodeManager { */ private final Node localCorpusDispatchTarget; - public SearchCluster(String clusterId, double minActivedocsPercentage, List nodes, + public SearchCluster(String clusterId, double minActivedocsPercentage, Collection nodes, VipStatus vipStatus, PingFactory pingFactory) { this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory); } @@ -55,6 +55,7 @@ public class SearchCluster implements NodeManager { @Override public String name() { return clusterId; } + public VipStatus getVipStatus() { return vipStatus; } public void addMonitoring(ClusterMonitor clusterMonitor) { for (var group : groups()) { -- cgit v1.2.3