diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2019-05-14 16:14:52 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2019-05-14 16:14:52 +0200 |
commit | 9107a86f5eb88092881de69b47a7ac9739b92a28 (patch) | |
tree | 0f9fb640aac3d315088ea46d35e8f1a50fcaa6db /container-search | |
parent | 81cda304c2c58e381f334a90817dd1ea63a56fe0 (diff) |
Use one JRT Supervisor now that it supports multiple transport threads
Diffstat (limited to 'container-search')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java | 6 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java | 19 |
2 files changed, 15 insertions, 10 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 7e48733106a..ea123b255eb 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -22,7 +22,11 @@ import java.util.List; * @author bratseth */ class RpcClient implements Client { - private final Supervisor supervisor = new Supervisor(new Transport()); + private final Supervisor supervisor; + + public RpcClient(int transportThreads) { + supervisor = new Supervisor(new Transport(transportThreads)); + } @Override public NodeConnection createConnection(String hostname, int port) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index cccf8dd3693..ca2a0c9bfb0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -33,23 +33,23 @@ public class RpcResourcePool { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools; - public RpcResourcePool(Map<Integer, Client.NodeConnection> nodeConnections) { + public RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) { var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); this.nodeConnectionPools = builder.build(); } public RpcResourcePool(DispatchConfig dispatchConfig) { - var clients = new ArrayList<RpcClient>(dispatchConfig.numJrtSupervisors()); - for (int i = 0; i < dispatchConfig.numJrtSupervisors(); i++) { - clients.add(new RpcClient()); - } + var client = new RpcClient(dispatchConfig.numJrtTransportThreads()); - // Create node rpc connection pools, indexed by the node distribution key + // Create rpc node connection pools indexed by the node distribution key var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); + var numConnections = dispatchConfig.numJrtConnectionsPerNode(); for (var node : dispatchConfig.node()) { - var connections = new ArrayList<Client.NodeConnection>(clients.size()); - clients.forEach(client -> connections.add(client.createConnection(node.host(), node.port()))); + var connections = new ArrayList<NodeConnection>(numConnections); + for (int i = 0; i < numConnections; i++) { + connections.add(client.createConnection(node.host(), node.port())); + } builder.put(node.key(), new NodeConnectionPool(connections)); } this.nodeConnectionPools = builder.build(); @@ -80,7 +80,7 @@ public class RpcResourcePool { private class NodeConnectionPool { private final List<Client.NodeConnection> connections; - NodeConnectionPool(List<Client.NodeConnection> connections) { + NodeConnectionPool(List<NodeConnection> connections) { this.connections = connections; } @@ -93,4 +93,5 @@ public class RpcResourcePool { connections.forEach(Client.NodeConnection::close); } } + } |