summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2019-05-14 16:14:52 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2019-05-14 16:14:52 +0200
commit9107a86f5eb88092881de69b47a7ac9739b92a28 (patch)
tree0f9fb640aac3d315088ea46d35e8f1a50fcaa6db /container-search
parent81cda304c2c58e381f334a90817dd1ea63a56fe0 (diff)
Use one JRT Supervisor now that it supports multiple transport threads
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java19
2 files changed, 15 insertions, 10 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 7e48733106a..ea123b255eb 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -22,7 +22,11 @@ import java.util.List;
* @author bratseth
*/
class RpcClient implements Client {
- private final Supervisor supervisor = new Supervisor(new Transport());
+ private final Supervisor supervisor;
+
+ public RpcClient(int transportThreads) {
+ supervisor = new Supervisor(new Transport(transportThreads));
+ }
@Override
public NodeConnection createConnection(String hostname, int port) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index cccf8dd3693..ca2a0c9bfb0 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -33,23 +33,23 @@ public class RpcResourcePool {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
- public RpcResourcePool(Map<Integer, Client.NodeConnection> nodeConnections) {
+ public RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) {
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
this.nodeConnectionPools = builder.build();
}
public RpcResourcePool(DispatchConfig dispatchConfig) {
- var clients = new ArrayList<RpcClient>(dispatchConfig.numJrtSupervisors());
- for (int i = 0; i < dispatchConfig.numJrtSupervisors(); i++) {
- clients.add(new RpcClient());
- }
+ var client = new RpcClient(dispatchConfig.numJrtTransportThreads());
- // Create node rpc connection pools, indexed by the node distribution key
+ // Create rpc node connection pools indexed by the node distribution key
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
+ var numConnections = dispatchConfig.numJrtConnectionsPerNode();
for (var node : dispatchConfig.node()) {
- var connections = new ArrayList<Client.NodeConnection>(clients.size());
- clients.forEach(client -> connections.add(client.createConnection(node.host(), node.port())));
+ var connections = new ArrayList<NodeConnection>(numConnections);
+ for (int i = 0; i < numConnections; i++) {
+ connections.add(client.createConnection(node.host(), node.port()));
+ }
builder.put(node.key(), new NodeConnectionPool(connections));
}
this.nodeConnectionPools = builder.build();
@@ -80,7 +80,7 @@ public class RpcResourcePool {
private class NodeConnectionPool {
private final List<Client.NodeConnection> connections;
- NodeConnectionPool(List<Client.NodeConnection> connections) {
+ NodeConnectionPool(List<NodeConnection> connections) {
this.connections = connections;
}
@@ -93,4 +93,5 @@ public class RpcResourcePool {
connections.forEach(Client.NodeConnection::close);
}
}
+
}