diff options
3 files changed, 20 insertions, 12 deletions
diff --git a/configdefinitions/src/vespa/dispatch.def b/configdefinitions/src/vespa/dispatch.def index 477a781ebbc..c3f847fb5f3 100644 --- a/configdefinitions/src/vespa/dispatch.def +++ b/configdefinitions/src/vespa/dispatch.def @@ -40,8 +40,11 @@ minWaitAfterCoverageFactor double default=0 # Maximum wait time for full coverage after minimum coverage is achieved, factored based on time left at minimum coverage maxWaitAfterCoverageFactor double default=1 -# Number of JRT connection supervisors -numJrtSupervisors int default=8 +# Number of JRT transport threads +numJrtTransportThreads int default=8 + +# Number of JRT connections per backend node +numJrtConnectionsPerNode int default=8 # The unique key of a search node node[].key int diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 7e48733106a..ea123b255eb 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -22,7 +22,11 @@ import java.util.List; * @author bratseth */ class RpcClient implements Client { - private final Supervisor supervisor = new Supervisor(new Transport()); + private final Supervisor supervisor; + + public RpcClient(int transportThreads) { + supervisor = new Supervisor(new Transport(transportThreads)); + } @Override public NodeConnection createConnection(String hostname, int port) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index cccf8dd3693..ca2a0c9bfb0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -33,23 +33,23 @@ public class RpcResourcePool { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools; - public RpcResourcePool(Map<Integer, Client.NodeConnection> nodeConnections) { + public RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) { var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); this.nodeConnectionPools = builder.build(); } public RpcResourcePool(DispatchConfig dispatchConfig) { - var clients = new ArrayList<RpcClient>(dispatchConfig.numJrtSupervisors()); - for (int i = 0; i < dispatchConfig.numJrtSupervisors(); i++) { - clients.add(new RpcClient()); - } + var client = new RpcClient(dispatchConfig.numJrtTransportThreads()); - // Create node rpc connection pools, indexed by the node distribution key + // Create rpc node connection pools indexed by the node distribution key var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); + var numConnections = dispatchConfig.numJrtConnectionsPerNode(); for (var node : dispatchConfig.node()) { - var connections = new ArrayList<Client.NodeConnection>(clients.size()); - clients.forEach(client -> connections.add(client.createConnection(node.host(), node.port()))); + var connections = new ArrayList<NodeConnection>(numConnections); + for (int i = 0; i < numConnections; i++) { + connections.add(client.createConnection(node.host(), node.port())); + } builder.put(node.key(), new NodeConnectionPool(connections)); } this.nodeConnectionPools = builder.build(); @@ -80,7 +80,7 @@ public class RpcResourcePool { private class NodeConnectionPool { private final List<Client.NodeConnection> connections; - NodeConnectionPool(List<Client.NodeConnection> connections) { + NodeConnectionPool(List<NodeConnection> connections) { this.connections = connections; } @@ -93,4 +93,5 @@ public class RpcResourcePool { connections.forEach(Client.NodeConnection::close); } } + } |