summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-26 11:43:28 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-26 11:43:28 +0100
commit4688a99d38d22e346282d993bc558d5b37676487 (patch)
treeedc1449bbff3f8260fde43107ab2781da46f4250 /container-search
parent6b2223c7e5897929a0570434d66ff08f554b6317 (diff)
Need a delayed close for the connections no longer needed.
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java27
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java16
2 files changed, 32 insertions, 11 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index afaa9afab33..63530a7f650 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -7,6 +7,7 @@ import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,11 +38,18 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
super();
rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
numConnections = dispatchConfig.numJrtConnectionsPerNode();
- updateNodes(nodesConfig);
+ updateNodes(nodesConfig).forEach(item -> {
+ try {
+ item.close();
+ } catch (Exception e) {}
+ });
}
- public void updateNodes(DispatchNodesConfig nodesConfig) {
+ /** Will return a list of items that need a delayed close */
+ public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
+ List<AutoCloseable> toClose = new ArrayList<>();
var builder = new HashMap<Integer, NodeConnectionPool>();
+ // Who can be reused
for (var node : nodesConfig.node()) {
var prev = nodeConnectionPools.get(node.key());
NodeConnection nc = prev != null ? prev.nextConnection() : null;
@@ -51,7 +59,6 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
{
builder.put(node.key(), prev);
} else {
- if (prev != null) prev.release();
var connections = new ArrayList<NodeConnection>(numConnections);
for (int i = 0; i < numConnections; i++) {
connections.add(rpcClient.createConnection(node.host(), node.port()));
@@ -59,7 +66,15 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
builder.put(node.key(), new NodeConnectionPool(connections));
}
}
+ // Who are not needed any more
+ nodeConnectionPools.forEach((key, pool) -> {
+ var survivor = builder.get(key);
+ if (survivor == null || pool != survivor) {
+ toClose.add(pool);
+ }
+ });
this.nodeConnectionPools = Map.copyOf(builder);
+ return toClose;
}
@Override
@@ -74,13 +89,13 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
@Override
public void close() {
- nodeConnectionPools.values().forEach(NodeConnectionPool::release);
+ nodeConnectionPools.values().forEach(NodeConnectionPool::close);
if (rpcClient != null) {
rpcClient.close();
}
}
- private static class NodeConnectionPool {
+ private static class NodeConnectionPool implements AutoCloseable {
private final List<Client.NodeConnection> connections;
NodeConnectionPool(List<NodeConnection> connections) {
@@ -92,7 +107,7 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
return connections.get(slot);
}
- void release() {
+ public void close() {
connections.forEach(Client.NodeConnection::close);
}
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
index ff37352b39c..7c1e7372507 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
@@ -73,8 +73,14 @@ public class RpcSearchInvokerTest {
assertEquals(maxHits, request.getHits());
}
- void verifyConnections(RpcResourcePool rpcResourcePool, int numGroups, int nodesPerGroup) {
- rpcResourcePool.updateNodes(createNodesConfig(numGroups,nodesPerGroup));
+ void verifyConnections(RpcResourcePool rpcResourcePool, int numGroups, int nodesPerGroup, int expectNeedCloseCount) {
+ var toClose = rpcResourcePool.updateNodes(createNodesConfig(numGroups,nodesPerGroup));
+ assertEquals(expectNeedCloseCount, toClose.size());
+ toClose.forEach(item -> {
+ try {
+ item.close();
+ } catch (Exception e) {}
+ });
for (int nodeId = 0; nodeId < numGroups*nodesPerGroup; nodeId++) {
assertTrue(rpcResourcePool.getConnection(nodeId) instanceof RpcClient.RpcNodeConnection);
}
@@ -84,9 +90,9 @@ public class RpcSearchInvokerTest {
@Test
void testUpdateOfRpcResourcePool() {
RpcResourcePool rpcResourcePool = new RpcResourcePool(createDispatchConfig(), createNodesConfig(0, 0));
- verifyConnections(rpcResourcePool, 3,3);
- verifyConnections(rpcResourcePool, 4,4);
- verifyConnections(rpcResourcePool, 2,2);
+ verifyConnections(rpcResourcePool, 3,3, 0);
+ verifyConnections(rpcResourcePool, 4,4, 6);
+ verifyConnections(rpcResourcePool, 2,2, 14);
}
private Client parameterCollectorClient(AtomicReference<CompressionType> compressionTypeHolder, AtomicReference<byte[]> payloadHolder,