diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-26 11:43:28 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-26 11:43:28 +0100 |
commit | 4688a99d38d22e346282d993bc558d5b37676487 (patch) | |
tree | edc1449bbff3f8260fde43107ab2781da46f4250 /container-search | |
parent | 6b2223c7e5897929a0570434d66ff08f554b6317 (diff) |
Need a delayed close for the connections no longer needed.
Diffstat (limited to 'container-search')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java | 27 | ||||
-rw-r--r-- | container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java | 16 |
2 files changed, 32 insertions, 11 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index afaa9afab33..63530a7f650 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -7,6 +7,7 @@ import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,11 +38,18 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { super(); rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); numConnections = dispatchConfig.numJrtConnectionsPerNode(); - updateNodes(nodesConfig); + updateNodes(nodesConfig).forEach(item -> { + try { + item.close(); + } catch (Exception e) {} + }); } - public void updateNodes(DispatchNodesConfig nodesConfig) { + /** Will return a list of items that need a delayed close */ + public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { + List<AutoCloseable> toClose = new ArrayList<>(); var builder = new HashMap<Integer, NodeConnectionPool>(); + // Who can be reused for (var node : nodesConfig.node()) { var prev = nodeConnectionPools.get(node.key()); NodeConnection nc = prev != null ? prev.nextConnection() : null; @@ -51,7 +59,6 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { { builder.put(node.key(), prev); } else { - if (prev != null) prev.release(); var connections = new ArrayList<NodeConnection>(numConnections); for (int i = 0; i < numConnections; i++) { connections.add(rpcClient.createConnection(node.host(), node.port())); @@ -59,7 +66,15 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { builder.put(node.key(), new NodeConnectionPool(connections)); } } + // Who are not needed any more + nodeConnectionPools.forEach((key, pool) -> { + var survivor = builder.get(key); + if (survivor == null || pool != survivor) { + toClose.add(pool); + } + }); this.nodeConnectionPools = Map.copyOf(builder); + return toClose; } @Override @@ -74,13 +89,13 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { @Override public void close() { - nodeConnectionPools.values().forEach(NodeConnectionPool::release); + nodeConnectionPools.values().forEach(NodeConnectionPool::close); if (rpcClient != null) { rpcClient.close(); } } - private static class NodeConnectionPool { + private static class NodeConnectionPool implements AutoCloseable { private final List<Client.NodeConnection> connections; NodeConnectionPool(List<NodeConnection> connections) { @@ -92,7 +107,7 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { return connections.get(slot); } - void release() { + public void close() { connections.forEach(Client.NodeConnection::close); } } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index ff37352b39c..7c1e7372507 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -73,8 +73,14 @@ public class RpcSearchInvokerTest { assertEquals(maxHits, request.getHits()); } - void verifyConnections(RpcResourcePool rpcResourcePool, int numGroups, int nodesPerGroup) { - rpcResourcePool.updateNodes(createNodesConfig(numGroups,nodesPerGroup)); + void verifyConnections(RpcResourcePool rpcResourcePool, int numGroups, int nodesPerGroup, int expectNeedCloseCount) { + var toClose = rpcResourcePool.updateNodes(createNodesConfig(numGroups,nodesPerGroup)); + assertEquals(expectNeedCloseCount, toClose.size()); + toClose.forEach(item -> { + try { + item.close(); + } catch (Exception e) {} + }); for (int nodeId = 0; nodeId < numGroups*nodesPerGroup; nodeId++) { assertTrue(rpcResourcePool.getConnection(nodeId) instanceof RpcClient.RpcNodeConnection); } @@ -84,9 +90,9 @@ public class RpcSearchInvokerTest { @Test void testUpdateOfRpcResourcePool() { RpcResourcePool rpcResourcePool = new RpcResourcePool(createDispatchConfig(), createNodesConfig(0, 0)); - verifyConnections(rpcResourcePool, 3,3); - verifyConnections(rpcResourcePool, 4,4); - verifyConnections(rpcResourcePool, 2,2); + verifyConnections(rpcResourcePool, 3,3, 0); + verifyConnections(rpcResourcePool, 4,4, 6); + verifyConnections(rpcResourcePool, 2,2, 14); } private Client parameterCollectorClient(AtomicReference<CompressionType> compressionTypeHolder, AtomicReference<byte[]> payloadHolder, |