aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/rpc')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java18
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java47
5 files changed, 39 insertions, 32 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 22ed8b6d9fa..6c1f666835c 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -12,7 +12,7 @@ import java.util.Optional;
*
* @author bratseth
*/
-interface Client {
+public interface Client {
/** Creates a connection to a particular node in this */
NodeConnection createConnection(String hostname, int port);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
index fd8e0e4f81a..a93ddb0b360 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
@@ -1,11 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
+
+import java.util.Collection;
+import java.util.List;
+
/**
* Interface for getting a connection given a node id.
*
* @author balderersheim
*/
-public interface RpcConnectionPool {
+public interface RpcConnectionPool extends AutoCloseable {
+
+ /** Returns a connection to the given node id. */
Client.NodeConnection getConnection(int nodeId);
+
+
+ /** Will return a list of items that need a delayed close when updating node set. */
+ default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); }
+
+ /** Shuts down all connections in the pool, and the underlying RPC client. */
+ @Override
+ void close();
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index 154002c4f77..b6228994ac8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory {
super(cluster, dispatchConfig);
this.rpcResourcePool = rpcResourcePool;
this.compressor = new CompressService();
- decodeType = convert(dispatchConfig.summaryDecodePolicy());
+ this.decodeType = convert(dispatchConfig.summaryDecodePolicy());
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index 53dc54f7bc5..a59097e5fff 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -37,7 +37,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
this.clusterMonitor = clusterMonitor;
this.pingSequenceId = node.createPingSequenceId();
this.pongHandler = pongHandler;
- this. compressor = compressor;
+ this.compressor = compressor;
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 63530a7f650..d1f22514481 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -3,8 +3,10 @@ package com.yahoo.search.dispatch.rpc;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
+import com.yahoo.search.dispatch.rpc.RpcClient.RpcNodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
+import com.yahoo.vespa.config.search.DispatchNodesConfig.Node;
import java.util.ArrayList;
import java.util.Collection;
@@ -19,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
+public class RpcResourcePool implements RpcConnectionPool {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of();
@@ -35,46 +37,35 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
}
public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
- super();
rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
numConnections = dispatchConfig.numJrtConnectionsPerNode();
- updateNodes(nodesConfig).forEach(item -> {
- try {
- item.close();
- } catch (Exception e) {}
+ updateNodes(nodesConfig).forEach(pool -> {
+ try { pool.close(); } catch (Exception ignored) { } // Shouldn't throw.
});
}
- /** Will return a list of items that need a delayed close */
- public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
- List<AutoCloseable> toClose = new ArrayList<>();
- var builder = new HashMap<Integer, NodeConnectionPool>();
+ @Override
+ public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
+ Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools);
+ Map<Integer, NodeConnectionPool> nextPools = new HashMap<>();
// Who can be reused
- for (var node : nodesConfig.node()) {
- var prev = nodeConnectionPools.get(node.key());
- NodeConnection nc = prev != null ? prev.nextConnection() : null;
- if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection
- && rpcNodeConnection.getPort() == node.port()
- && rpcNodeConnection.getHostname().equals(node.host()))
+ for (Node node : nodesConfig.node()) {
+ if ( currentPools.containsKey(node.key())
+ && currentPools.get(node.key()).nextConnection() instanceof RpcNodeConnection rpcNodeConnection
+ && rpcNodeConnection.getPort() == node.port()
+ && rpcNodeConnection.getHostname().equals(node.host()))
{
- builder.put(node.key(), prev);
+ nextPools.put(node.key(), currentPools.remove(node.key()));
} else {
- var connections = new ArrayList<NodeConnection>(numConnections);
+ ArrayList<NodeConnection> connections = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; i++) {
connections.add(rpcClient.createConnection(node.host(), node.port()));
}
- builder.put(node.key(), new NodeConnectionPool(connections));
+ nextPools.put(node.key(), new NodeConnectionPool(connections));
}
}
- // Who are not needed any more
- nodeConnectionPools.forEach((key, pool) -> {
- var survivor = builder.get(key);
- if (survivor == null || pool != survivor) {
- toClose.add(pool);
- }
- });
- this.nodeConnectionPools = Map.copyOf(builder);
- return toClose;
+ this.nodeConnectionPools = Map.copyOf(nextPools);
+ return currentPools.values();
}
@Override